From 7cf7d7455d17c7a30266edef6bc6b21b9b86cf03 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 12 Feb 2013 20:46:39 +0100 Subject: [PATCH] [#1048] Make sure the promise is not notified multiple times on failure --- .../java/io/netty/handler/ssl/SslHandler.java | 6 +- .../io/netty/channel/AbstractChannel.java | 2 +- .../channel/ChannelFlushPromiseNotifier.java | 63 +++++++++++++++---- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 50f5ce9a47..32fcc7a81d 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -157,7 +157,7 @@ public class SslHandler private volatile ChannelHandlerContext ctx; private final SSLEngine engine; private final Executor delegatedTaskExecutor; - private final ChannelFlushPromiseNotifier flushFutureNotifier = new ChannelFlushPromiseNotifier(); + private final ChannelFlushPromiseNotifier flushFutureNotifier = new ChannelFlushPromiseNotifier(true); private final boolean startTls; private boolean sentFirstMessage; @@ -442,10 +442,10 @@ public class SslHandler } if (ctx.executor() == ctx.channel().eventLoop()) { - flushFutureNotifier.addFlushFuture(promise, in.readableBytes()); + flushFutureNotifier.add(promise, in.readableBytes()); } else { synchronized (flushFutureNotifier) { - flushFutureNotifier.addFlushFuture(promise, in.readableBytes()); + flushFutureNotifier.add(promise, in.readableBytes()); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index f5b3adec31..951a6b6fb2 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -833,7 +833,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ChannelFuture flushNotifier(ChannelPromise promise) { // Append flush future to the notification list. if (promise != voidPromise) { - flushFutureNotifier.addFlushFuture(promise, outboundBufSize()); + flushFutureNotifier.add(promise, outboundBufSize()); } return promise; } diff --git a/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java b/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java index 08cf543ccb..4365bf67db 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java @@ -26,25 +26,46 @@ public final class ChannelFlushPromiseNotifier { private long writeCounter; private final Queue flushCheckpoints = new ArrayDeque(); + private final boolean tryNotify; + + /** + * Create a new instance + * + * @param tryNotify if {@code true} the {@link ChannelPromise}s will get notified with + * {@link ChannelPromise#trySuccess()} and {@link ChannelPromise#tryFailure(Throwable)}. + * Otherwise {@link ChannelPromise#setSuccess()} and {@link ChannelPromise#setFailure(Throwable)} + * is used + */ + public ChannelFlushPromiseNotifier(boolean tryNotify) { + this.tryNotify = tryNotify; + } + + /** + * Create a new instance which will use {@link ChannelPromise#setSuccess()} and + * {@link ChannelPromise#setFailure(Throwable)} to notify the {@link ChannelPromise}s. + */ + public ChannelFlushPromiseNotifier() { + this(false); + } /** * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given * pendingDataSize was reached. */ - public ChannelFlushPromiseNotifier addFlushFuture(ChannelPromise future, int pendingDataSize) { - if (future == null) { - throw new NullPointerException("future"); + public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) { + if (promise == null) { + throw new NullPointerException("promise"); } if (pendingDataSize < 0) { throw new IllegalArgumentException("pendingDataSize must be >= 0 but was" + pendingDataSize); } long checkpoint = writeCounter + pendingDataSize; - if (future instanceof FlushCheckpoint) { - FlushCheckpoint cp = (FlushCheckpoint) future; + if (promise instanceof FlushCheckpoint) { + FlushCheckpoint cp = (FlushCheckpoint) promise; cp.flushCheckpoint(checkpoint); flushCheckpoints.add(cp); } else { - flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future)); + flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise)); } return this; } @@ -68,7 +89,7 @@ public final class ChannelFlushPromiseNotifier { } /** - * Notify all {@link ChannelFuture}s that were registered with {@link #addFlushFuture(ChannelPromise, int)} and + * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and * their pendingDatasize is smaller after the the current writeCounter returned by {@link #writeCounter()}. * * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and @@ -80,7 +101,7 @@ public final class ChannelFlushPromiseNotifier { } /** - * Notify all {@link ChannelFuture}s that were registered with {@link #addFlushFuture(ChannelPromise, int)} and + * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}. * * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and @@ -97,13 +118,17 @@ public final class ChannelFlushPromiseNotifier { if (cp == null) { break; } - cp.future().setFailure(cause); + if (tryNotify) { + cp.future().tryFailure(cause); + } else { + cp.future().setFailure(cause); + } } return this; } /** - * Notify all {@link ChannelFuture}s that were registered with {@link #addFlushFuture(ChannelPromise, int)} and + * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using * the given cause1. * @@ -125,7 +150,11 @@ public final class ChannelFlushPromiseNotifier { if (cp == null) { break; } - cp.future().setFailure(cause2); + if (tryNotify) { + cp.future().tryFailure(cause2); + } else { + cp.future().setFailure(cause2); + } } return this; } @@ -155,9 +184,17 @@ public final class ChannelFlushPromiseNotifier { flushCheckpoints.remove(); if (cause == null) { - cp.future().setSuccess(); + if (tryNotify) { + cp.future().trySuccess(); + } else { + cp.future().setSuccess(); + } } else { - cp.future().setFailure(cause); + if (tryNotify) { + cp.future().tryFailure(cause); + } else { + cp.future().setFailure(cause); + } } }