[#1048] Make sure the promise is not notified multiple times on failure

This commit is contained in:
Norman Maurer 2013-02-12 20:46:39 +01:00
parent 17e37fdfe6
commit 7cf7d7455d
3 changed files with 54 additions and 17 deletions

View File

@ -157,7 +157,7 @@ public class SslHandler
private volatile ChannelHandlerContext ctx;
private final SSLEngine engine;
private final Executor delegatedTaskExecutor;
private final ChannelFlushPromiseNotifier flushFutureNotifier = new ChannelFlushPromiseNotifier();
private final ChannelFlushPromiseNotifier flushFutureNotifier = new ChannelFlushPromiseNotifier(true);
private final boolean startTls;
private boolean sentFirstMessage;
@ -442,10 +442,10 @@ public class SslHandler
}
if (ctx.executor() == ctx.channel().eventLoop()) {
flushFutureNotifier.addFlushFuture(promise, in.readableBytes());
flushFutureNotifier.add(promise, in.readableBytes());
} else {
synchronized (flushFutureNotifier) {
flushFutureNotifier.addFlushFuture(promise, in.readableBytes());
flushFutureNotifier.add(promise, in.readableBytes());
}
}

View File

@ -833,7 +833,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ChannelFuture flushNotifier(ChannelPromise promise) {
// Append flush future to the notification list.
if (promise != voidPromise) {
flushFutureNotifier.addFlushFuture(promise, outboundBufSize());
flushFutureNotifier.add(promise, outboundBufSize());
}
return promise;
}

View File

@ -26,25 +26,46 @@ public final class ChannelFlushPromiseNotifier {
private long writeCounter;
private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private final boolean tryNotify;
/**
* Create a new instance
*
* @param tryNotify if {@code true} the {@link ChannelPromise}s will get notified with
* {@link ChannelPromise#trySuccess()} and {@link ChannelPromise#tryFailure(Throwable)}.
* Otherwise {@link ChannelPromise#setSuccess()} and {@link ChannelPromise#setFailure(Throwable)}
* is used
*/
public ChannelFlushPromiseNotifier(boolean tryNotify) {
this.tryNotify = tryNotify;
}
/**
* Create a new instance which will use {@link ChannelPromise#setSuccess()} and
* {@link ChannelPromise#setFailure(Throwable)} to notify the {@link ChannelPromise}s.
*/
public ChannelFlushPromiseNotifier() {
this(false);
}
/**
* Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
* pendingDataSize was reached.
*/
public ChannelFlushPromiseNotifier addFlushFuture(ChannelPromise future, int pendingDataSize) {
if (future == null) {
throw new NullPointerException("future");
public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
if (promise == null) {
throw new NullPointerException("promise");
}
if (pendingDataSize < 0) {
throw new IllegalArgumentException("pendingDataSize must be >= 0 but was" + pendingDataSize);
}
long checkpoint = writeCounter + pendingDataSize;
if (future instanceof FlushCheckpoint) {
FlushCheckpoint cp = (FlushCheckpoint) future;
if (promise instanceof FlushCheckpoint) {
FlushCheckpoint cp = (FlushCheckpoint) promise;
cp.flushCheckpoint(checkpoint);
flushCheckpoints.add(cp);
} else {
flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future));
flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
}
return this;
}
@ -68,7 +89,7 @@ public final class ChannelFlushPromiseNotifier {
}
/**
* Notify all {@link ChannelFuture}s that were registered with {@link #addFlushFuture(ChannelPromise, int)} and
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* their pendingDatasize is smaller after the the current writeCounter returned by {@link #writeCounter()}.
*
* After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
@ -80,7 +101,7 @@ public final class ChannelFlushPromiseNotifier {
}
/**
* Notify all {@link ChannelFuture}s that were registered with {@link #addFlushFuture(ChannelPromise, int)} and
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
*
* After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
@ -97,13 +118,17 @@ public final class ChannelFlushPromiseNotifier {
if (cp == null) {
break;
}
cp.future().setFailure(cause);
if (tryNotify) {
cp.future().tryFailure(cause);
} else {
cp.future().setFailure(cause);
}
}
return this;
}
/**
* Notify all {@link ChannelFuture}s that were registered with {@link #addFlushFuture(ChannelPromise, int)} and
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
* the given cause1.
*
@ -125,7 +150,11 @@ public final class ChannelFlushPromiseNotifier {
if (cp == null) {
break;
}
cp.future().setFailure(cause2);
if (tryNotify) {
cp.future().tryFailure(cause2);
} else {
cp.future().setFailure(cause2);
}
}
return this;
}
@ -155,9 +184,17 @@ public final class ChannelFlushPromiseNotifier {
flushCheckpoints.remove();
if (cause == null) {
cp.future().setSuccess();
if (tryNotify) {
cp.future().trySuccess();
} else {
cp.future().setSuccess();
}
} else {
cp.future().setFailure(cause);
if (tryNotify) {
cp.future().tryFailure(cause);
} else {
cp.future().setFailure(cause);
}
}
}