diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java index 9e437703a3..f8371a0a32 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java @@ -148,25 +148,25 @@ public class RxtxChannel extends OioByteStreamChannel { public void run() { try { doInit(); - promise.setSuccess(); + safeSetSuccess(promise); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); } } }, waitTime, TimeUnit.MILLISECONDS); } else { doInit(); - promise.setSuccess(); + safeSetSuccess(promise); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } } } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 4cc2b75e21..deaf67a2a0 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -424,7 +424,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); - promise.setFailure(t); + safeSetFailure(promise, t); } } } @@ -438,7 +438,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } doRegister(); registered = true; - promise.setSuccess(); + safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { pipeline.fireChannelActive(); @@ -447,11 +447,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); - if (!promise.tryFailure(t)) { - logger.warn( - "Tried to fail the registration promise, but it is complete already. " + - "Swallowing the cause of the registration failure:", t); - } + safeSetFailure(promise, t); } } @@ -478,11 +474,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doBind(localAddress); } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); return; } - promise.setSuccess(); if (!wasActive && isActive()) { invokeLater(new Runnable() { @@ -492,6 +487,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } }); } + + safeSetSuccess(promise); } @Override @@ -504,11 +501,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doDisconnect(); } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); return; } - promise.setSuccess(); + if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override @@ -517,6 +514,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } }); } + + safeSetSuccess(promise); closeIfClosed(); // doDisconnect() might have closed the channel } @@ -538,7 +537,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (closeFuture.isDone()) { // Closed already. - promise.setSuccess(); + safeSetSuccess(promise); return; } @@ -549,10 +548,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doClose(); closeFuture.setClosed(); - promise.setSuccess(); + safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); - promise.setFailure(t); + safeSetFailure(promise, t); } // Fail all the queued messages @@ -590,7 +589,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } if (!registered) { - promise.setSuccess(); + safeSetSuccess(promise); return; } @@ -601,18 +600,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } finally { if (registered) { registered = false; - promise.setSuccess(); invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelUnregistered(); } }); + safeSetSuccess(promise); } else { // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, // close() calls deregister() again - no need to fire channelUnregistered. - promise.setSuccess(); + safeSetSuccess(promise); } } } @@ -641,9 +640,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (!isActive()) { // Mark the write request as failure if the channel is inactive. if (isOpen()) { - promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); + safeSetFailure(promise, NOT_YET_CONNECTED_EXCEPTION); } else { - promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); + safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); } // release message now to prevent resource-leak ReferenceCountUtil.release(msg); @@ -712,10 +711,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return true; } - promise.setFailure(CLOSED_CHANNEL_EXCEPTION); + safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); return false; } + /** + * Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message. + */ + protected final void safeSetSuccess(ChannelPromise promise) { + if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { + logger.warn("Failed to mark a promise as success because it is done already: {}", promise); + } + } + + /** + * Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message. + */ + protected final void safeSetFailure(ChannelPromise promise, Throwable cause) { + if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { + logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); + } + } + protected final void closeIfClosed() { if (isOpen()) { return; diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 5835e2b77a..7a2794a587 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -89,7 +89,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } private void reject(ChannelPromise promise) { - promise.setFailure(new UnsupportedOperationException()); + safeSetFailure(promise, new UnsupportedOperationException()); } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index a6419e46c3..60eae3943b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -281,8 +281,7 @@ public final class ChannelOutboundBuffer { flushed = flushed + 1 & buffer.length - 1; safeRelease(msg); - - promise.trySuccess(); + safeSuccess(promise); decrementPendingOutboundBytes(size); return true; @@ -349,7 +348,8 @@ public final class ChannelOutboundBuffer { nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { - entry.count = count = buf.nioBufferCount(); + //noinspection ConstantValueVariableUse + entry.count = count = buf.nioBufferCount(); } int neededSpace = nioBufferCount + count; if (neededSpace > nioBuffers.length) { @@ -527,9 +527,15 @@ public final class ChannelOutboundBuffer { } } + private static void safeSuccess(ChannelPromise promise) { + if (!promise.trySuccess()) { + logger.warn("Failed to mark a promise as success because it is done already: {}", promise); + } + } + private static void safeFail(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { - logger.warn("Promise done already: {} - new exception is:", promise, cause); + logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } } @@ -561,7 +567,7 @@ public final class ChannelOutboundBuffer { } public long totalPendingWriteBytes() { - return this.totalPendingSize; + return totalPendingSize; } private static final class Entry { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 5b01a9c58d..62777685de 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -334,7 +334,7 @@ public class EmbeddedChannel extends AbstractChannel { private class DefaultUnsafe extends AbstractUnsafe { @Override public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - promise.setSuccess(); + safeSetSuccess(promise); } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 66711616f0..25ffb6ccad 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -320,13 +320,13 @@ public class LocalChannel extends AbstractChannel { @Override public void connect(final SocketAddress remoteAddress, SocketAddress localAddress, final ChannelPromise promise) { - if (!ensureOpen(promise)) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } if (state == 2) { Exception cause = new AlreadyConnectedException(); - promise.setFailure(cause); + safeSetFailure(promise, cause); pipeline().fireExceptionCaught(cause); return; } @@ -348,7 +348,7 @@ public class LocalChannel extends AbstractChannel { try { doBind(localAddress); } catch (Throwable t) { - promise.setFailure(t); + safeSetFailure(promise, t); close(voidPromise()); return; } @@ -357,7 +357,7 @@ public class LocalChannel extends AbstractChannel { Channel boundChannel = LocalChannelRegistry.get(remoteAddress); if (!(boundChannel instanceof LocalServerChannel)) { Exception cause = new ChannelException("connection refused"); - promise.setFailure(cause); + safeSetFailure(promise, cause); close(voidPromise()); return; } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 8fe1da004f..09a0d7262f 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -156,7 +156,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override public void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (!ensureOpen(promise)) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index d060abc8e8..eff7651cff 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -65,7 +65,7 @@ public abstract class AbstractOioChannel extends AbstractChannel { try { boolean wasActive = isActive(); doConnect(remoteAddress, localAddress); - promise.setSuccess(); + safeSetSuccess(promise); if (!wasActive && isActive()) { pipeline().fireChannelActive(); } @@ -75,7 +75,7 @@ public abstract class AbstractOioChannel extends AbstractChannel { newT.setStackTrace(t.getStackTrace()); t = newT; } - promise.setFailure(t); + safeSetFailure(promise, t); closeIfClosed(); } }