From fee1d9e75c7a510af831bcff31eb5c57358ea351 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 10 Feb 2014 14:52:24 -0800 Subject: [PATCH] Make most outbound operations cancellable / More robust promise update - Inspired by #2214 by @normanmaurer - Call setUncancellable() before performing an outbound operation - Add safeSetSuccess/Failure() and use them wherever --- .../io/netty/channel/rxtx/RxtxChannel.java | 8 ++-- .../io/netty/channel/AbstractChannel.java | 48 ++++++++++++------- .../netty/channel/AbstractServerChannel.java | 2 +- .../netty/channel/ChannelOutboundBuffer.java | 16 +++++-- .../channel/embedded/EmbeddedChannel.java | 2 +- .../io/netty/channel/local/LocalChannel.java | 8 ++-- .../netty/channel/nio/AbstractNioChannel.java | 2 +- .../netty/channel/oio/AbstractOioChannel.java | 4 +- 8 files changed, 55 insertions(+), 35 deletions(-) diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java index f1d4da76e1..ba5961994d 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java @@ -149,25 +149,25 @@ public class RxtxChannel extends OioByteStreamChannel { public void run() { try { doInit(); - promise.setSuccess(); + safeSetSuccess(promise); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); } } }, waitTime, TimeUnit.MILLISECONDS); } else { doInit(); - promise.setSuccess(); + safeSetSuccess(promise); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } } } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 194c4a7f9a..0d410c9239 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -409,7 +409,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); - promise.setFailure(t); + safeSetFailure(promise, t); } } } @@ -423,7 +423,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } doRegister(); registered = true; - promise.setSuccess(); + safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { pipeline.fireChannelActive(); @@ -432,11 +432,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); - if (!promise.tryFailure(t)) { - logger.warn( - "Tried to fail the registration promise, but it is complete already. " + - "Swallowing the cause of the registration failure:", t); - } + safeSetFailure(promise, t); } } @@ -463,7 +459,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doBind(localAddress); } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); return; } @@ -475,7 +471,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } }); } - promise.setSuccess(); + safeSetSuccess(promise); } @Override @@ -488,7 +484,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doDisconnect(); } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); return; } @@ -500,7 +496,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } }); } - promise.setSuccess(); + safeSetSuccess(promise); closeIfClosed(); // doDisconnect() might have closed the channel } @@ -522,7 +518,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (closeFuture.isDone()) { // Closed already. - promise.setSuccess(); + safeSetSuccess(promise); return; } @@ -533,10 +529,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doClose(); closeFuture.setClosed(); - promise.setSuccess(); + safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); - promise.setFailure(t); + safeSetFailure(promise, t); } // Fail all the queued messages @@ -607,9 +603,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (!isActive()) { // Mark the write request as failure if the channel is inactive. if (isOpen()) { - promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); + safeSetFailure(promise, NOT_YET_CONNECTED_EXCEPTION); } else { - promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); + safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); } // release message now to prevent resource-leak ReferenceCountUtil.release(msg); @@ -675,10 +671,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return true; } - promise.setFailure(CLOSED_CHANNEL_EXCEPTION); + safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); return false; } + /** + * Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message. + */ + protected final void safeSetSuccess(ChannelPromise promise) { + if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { + logger.warn("Failed to mark a promise as success because it is done already: {}", promise); + } + } + + /** + * Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message. + */ + protected final void safeSetFailure(ChannelPromise promise, Throwable cause) { + if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { + logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); + } + } + protected final void closeIfClosed() { if (isOpen()) { return; diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index ccf9b47faf..5bde3d2454 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -97,7 +97,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } private void reject(ChannelPromise promise) { - promise.setFailure(new UnsupportedOperationException()); + safeSetFailure(promise, new UnsupportedOperationException()); } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 1024672233..b41abb12ee 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -28,8 +28,8 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -325,8 +325,7 @@ public final class ChannelOutboundBuffer { flushed = flushed + 1 & buffer.length - 1; safeRelease(msg); - - promise.trySuccess(); + safeSuccess(promise); decrementPendingOutboundBytes(size); return true; @@ -393,6 +392,7 @@ public final class ChannelOutboundBuffer { nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { + //noinspection ConstantValueVariableUse entry.count = count = buf.nioBufferCount(); } int neededSpace = nioBufferCount + count; @@ -577,9 +577,15 @@ public final class ChannelOutboundBuffer { } } + private static void safeSuccess(ChannelPromise promise) { + if (!promise.trySuccess()) { + logger.warn("Failed to mark a promise as success because it is done already: {}", promise); + } + } + private static void safeFail(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { - logger.warn("Promise done already: {} - new exception is:", promise, cause); + logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } } @@ -611,7 +617,7 @@ public final class ChannelOutboundBuffer { } public long totalPendingWriteBytes() { - return this.totalPendingSize; + return totalPendingSize; } private static final class Entry { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 0f272855dd..9bd6a0db7a 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -343,7 +343,7 @@ public class EmbeddedChannel extends AbstractChannel { private class DefaultUnsafe extends AbstractUnsafe { @Override public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - promise.setSuccess(); + safeSetSuccess(promise); } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index e90584fac1..952d78a63a 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -320,13 +320,13 @@ public class LocalChannel extends AbstractChannel { @Override public void connect(final SocketAddress remoteAddress, SocketAddress localAddress, final ChannelPromise promise) { - if (!ensureOpen(promise)) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } if (state == State.CONNECTED) { Exception cause = new AlreadyConnectedException(); - promise.setFailure(cause); + safeSetFailure(promise, cause); pipeline().fireExceptionCaught(cause); return; } @@ -348,7 +348,7 @@ public class LocalChannel extends AbstractChannel { try { doBind(localAddress); } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); close(voidPromise()); return; } @@ -357,7 +357,7 @@ public class LocalChannel extends AbstractChannel { Channel boundChannel = LocalChannelRegistry.get(remoteAddress); if (!(boundChannel instanceof LocalServerChannel)) { Exception cause = new ChannelException("connection refused"); - promise.setFailure(cause); + safeSetFailure(promise, cause); close(voidPromise()); return; } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index c2bac9a873..5c59ed6cea 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -156,7 +156,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override public void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (!ensureOpen(promise)) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 3c4453fbbe..afc275d083 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -62,7 +62,7 @@ public abstract class AbstractOioChannel extends AbstractChannel { try { boolean wasActive = isActive(); doConnect(remoteAddress, localAddress); - promise.setSuccess(); + safeSetSuccess(promise); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } @@ -72,7 +72,7 @@ public abstract class AbstractOioChannel extends AbstractChannel { newT.setStackTrace(t.getStackTrace()); t = newT; } - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); } }