From aa8bdb5d6b7f89d56712c2d177c27857a17d79cd Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 16 Sep 2017 17:20:01 -0700 Subject: [PATCH] Fix assertion error when closing / shutdown native channel and SO_LINGER is set. Motivation: When SO_LINGER is used we run doClose() on the GlobalEventExecutor by default so we need to ensure we schedule all code that needs to be run on the EventLoop on the EventLoop in doClose. Beside this there are also threading issues when calling shutdownOutput(...) Modifications: - Schedule removal from EventLoop to the EventLoop - Correctly handle shutdownOutput and shutdown in respect with threading-model - Add unit tests Result: Fixes [#7159]. --- .../SocketShutdownOutputBySelfTest.java | 47 ++++++ .../channel/epoll/AbstractEpollChannel.java | 20 ++- .../epoll/AbstractEpollServerChannel.java | 11 -- .../epoll/AbstractEpollStreamChannel.java | 117 ++++++------- .../channel/epoll/EpollDatagramChannel.java | 11 -- .../channel/epoll/EpollSocketChannelTest.java | 19 +++ .../channel/kqueue/AbstractKQueueChannel.java | 25 ++- .../kqueue/AbstractKQueueServerChannel.java | 11 -- .../kqueue/AbstractKQueueStreamChannel.java | 159 ++++++++---------- .../channel/kqueue/KQueueDatagramChannel.java | 10 -- .../kqueue/KQueueChannelConfigTest.java | 24 +++ .../io/netty/channel/AbstractChannel.java | 65 +++++-- .../netty/channel/AbstractServerChannel.java | 12 -- .../channel/embedded/EmbeddedChannel.java | 7 - .../io/netty/channel/local/LocalChannel.java | 8 - .../nio/AbstractNioMessageChannel.java | 10 +- .../oio/AbstractOioMessageChannel.java | 8 - .../socket/nio/NioDatagramChannel.java | 10 -- .../channel/socket/nio/NioSocketChannel.java | 145 ++++++---------- .../socket/oio/OioDatagramChannel.java | 13 +- .../channel/socket/oio/OioSocketChannel.java | 73 ++++---- 21 files changed, 387 insertions(+), 418 deletions(-) diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java index e7f288b146..38e6b46c86 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.WriteBufferWaterMark; @@ -193,6 +194,52 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest { } } + @Test(timeout = 30000) + public void testShutdownOutputSoLingerNoAssertError() throws Throwable { + run(); + } + + public void testShutdownOutputSoLingerNoAssertError(Bootstrap cb) throws Throwable { + testShutdownSoLingerNoAssertError0(cb, true); + } + + @Test(timeout = 30000) + public void testShutdownSoLingerNoAssertError() throws Throwable { + run(); + } + + public void testShutdownSoLingerNoAssertError(Bootstrap cb) throws Throwable { + testShutdownSoLingerNoAssertError0(cb, false); + } + + private void testShutdownSoLingerNoAssertError0(Bootstrap cb, boolean output) throws Throwable { + ServerSocket ss = new ServerSocket(); + Socket s = null; + + ChannelFuture cf = null; + try { + ss.bind(newSocketAddress()); + cf = cb.option(ChannelOption.SO_LINGER, 1).handler(new ChannelInboundHandlerAdapter()) + .connect(ss.getLocalSocketAddress()).sync(); + s = ss.accept(); + + cf.sync(); + + if (output) { + ((SocketChannel) cf.channel()).shutdownOutput().sync(); + } else { + ((SocketChannel) cf.channel()).shutdown().sync(); + } + } finally { + if (s != null) { + s.close(); + } + if (cf != null) { + cf.channel().close(); + } + ss.close(); + } + } private static void checkThrowable(Throwable cause) throws Throwable { // Depending on OIO / NIO both are ok if (!(cause instanceof ClosedChannelException) && !(cause instanceof SocketException)) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 352a74bbb1..22235c4aba 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -171,7 +171,25 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } if (isRegistered()) { - doDeregister(); + // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor + // if SO_LINGER is used. + // + // See https://github.com/netty/netty/issues/7159 + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + doDeregister(); + } else { + loop.execute(new Runnable() { + @Override + public void run() { + try { + doDeregister(); + } catch (Throwable cause) { + pipeline().fireExceptionCaught(cause); + } + } + }); + } } } finally { socket.close(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index 52ad42e3ce..ebda9ebfe7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; -import io.netty.util.internal.UnstableApi; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -73,16 +72,6 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im throw new UnsupportedOperationException(); } - @UnstableApi - @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - try { - super.doShutdownOutput(cause); - } finally { - close(); - } - } - abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 03f5559458..73c316b8e3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -545,29 +545,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im @UnstableApi @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - try { - // The native socket implementation may throw a NotYetConnected exception when we attempt to shut it down. - // However NIO doesn't propagate an exception in the same situation (write failure), and we just want to - // update the socket state to flag that it has been shutdown. So don't use a voidPromise but instead create - // a new promise and ignore the results. - shutdownOutput0(newPromise()); - } finally { - super.doShutdownOutput(cause); - } - } - - private void shutdownOutput0(final ChannelPromise promise) { - try { - try { - socket.shutdown(false, true); - } finally { - ((AbstractUnsafe) unsafe()).shutdownOutput(); - } - promise.setSuccess(); - } catch (Throwable cause) { - promise.setFailure(cause); - } + protected final void doShutdownOutput() throws Exception { + socket.shutdown(false, true); } private void shutdownInput0(final ChannelPromise promise) { @@ -579,19 +558,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } - private void shutdown0(final ChannelPromise promise) { - try { - try { - socket.shutdown(true, true); - } finally { - ((AbstractUnsafe) unsafe()).shutdownOutput(); - } - promise.setSuccess(); - } catch (Throwable cause) { - promise.setFailure(cause); - } - } - @Override public boolean isOutputShutdown() { return socket.isOutputShutdown(); @@ -614,27 +580,18 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + ((AbstractUnsafe) unsafe()).shutdownOutput(promise); + } else { + loop.execute(new Runnable() { @Override public void run() { - shutdownOutput0(promise); + ((AbstractUnsafe) unsafe()).shutdownOutput(promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdownOutput0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownOutput0(promise); - } - }); - } } + return promise; } @@ -676,30 +633,52 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im @Override public ChannelFuture shutdown(final ChannelPromise promise) { - Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + ChannelFuture shutdownOutputFuture = shutdownOutput(); + if (shutdownOutputFuture.isDone()) { + shutdownOutputDone(shutdownOutputFuture, promise); + } else { + shutdownOutputFuture.addListener(new ChannelFutureListener() { @Override - public void run() { - shutdown0(promise); + public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception { + shutdownOutputDone(shutdownOutputFuture, promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdown0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdown0(promise); - } - }); - } } return promise; } + private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) { + ChannelFuture shutdownInputFuture = shutdownInput(); + if (shutdownInputFuture.isDone()) { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); + } else { + shutdownInputFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); + } + }); + } + } + + private static void shutdownDone(ChannelFuture shutdownOutputFuture, + ChannelFuture shutdownInputFuture, + ChannelPromise promise) { + Throwable shutdownOutputCause = shutdownOutputFuture.cause(); + Throwable shutdownInputCause = shutdownInputFuture.cause(); + if (shutdownOutputCause != null) { + if (shutdownInputCause != null) { + logger.debug("Exception suppressed because a previous exception occurred.", + shutdownInputCause); + } + promise.setFailure(shutdownOutputCause); + } else if (shutdownInputCause != null) { + promise.setFailure(shutdownInputCause); + } else { + promise.setSuccess(); + } + } + @Override protected void doClose() throws Exception { try { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index e3ee726535..8cbc31adde 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -31,7 +31,6 @@ import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.internal.StringUtil; -import io.netty.util.internal.UnstableApi; import java.io.IOException; import java.net.InetAddress; @@ -423,16 +422,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return false; } - @UnstableApi - @Override - protected void doShutdownOutput(Throwable cause) throws Exception { - // UDP sockets are not connected. A write failure may just be temporary or disconnect was called. - ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); - if (channelOutboundBuffer != null) { - channelOutboundBuffer.remove(cause); - } - } - @Override protected void doClose() throws Exception { super.doClose(); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java index 9637006217..6f269e3e34 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import org.junit.Assert; import org.junit.Test; @@ -98,4 +99,22 @@ public class EpollSocketChannelTest { Assert.assertTrue(info.rcvSpace() >= 0); Assert.assertTrue(info.totalRetrans() >= 0); } + + // See https://github.com/netty/netty/issues/7159 + @Test + public void testSoLingerNoAssertError() throws Exception { + EventLoopGroup group = new EpollEventLoopGroup(1); + + try { + Bootstrap bootstrap = new Bootstrap(); + EpollSocketChannel ch = (EpollSocketChannel) bootstrap.group(group) + .channel(EpollSocketChannel.class) + .option(ChannelOption.SO_LINGER, 10) + .handler(new ChannelInboundHandlerAdapter()) + .bind(new InetSocketAddress(0)).syncUninterruptibly().channel(); + ch.close().syncUninterruptibly(); + } finally { + group.shutdownGracefully(); + } + } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 94a9fc637e..9ed07d1f65 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -137,9 +137,28 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan try { if (isRegistered()) { // The FD will be closed, which should take care of deleting any associated events from kqueue, but - // since we rely upon jniSelfRef to be consistent we make sure that we clear this reference out for all] - // events which are pending in kqueue to avoid referencing a deleted pointer at a later time. - doDeregister(); + // since we rely upon jniSelfRef to be consistent we make sure that we clear this reference out for + // all events which are pending in kqueue to avoid referencing a deleted pointer at a later time. + + // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor + // if SO_LINGER is used. + // + // See https://github.com/netty/netty/issues/7159 + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + doDeregister(); + } else { + loop.execute(new Runnable() { + @Override + public void run() { + try { + doDeregister(); + } catch (Throwable cause) { + pipeline().fireExceptionCaught(cause); + } + } + }); + } } } finally { socket.close(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java index 0760527761..4e7534dff1 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; import io.netty.util.internal.UnstableApi; @@ -70,16 +69,6 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel throw new UnsupportedOperationException(); } - @UnstableApi - @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - try { - super.doShutdownOutput(cause); - } finally { - close(); - } - } - abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception; @Override diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index 9e9a59eab7..4656231d7a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; @@ -34,6 +35,8 @@ import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; @@ -43,6 +46,7 @@ import java.util.concurrent.Executor; @UnstableApi public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final String EXPECTED_TYPES = " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + @@ -372,51 +376,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel @UnstableApi @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - try { - // The native socket implementation may throw a NotYetConnected exception when we attempt to shut it down. - // However NIO doesn't propagate an exception in the same situation (write failure), and we just want to - // update the socket state to flag that it has been shutdown. So don't use a voidPromise but instead create - // a new promise and ignore the results. - shutdownOutput0(newPromise()); - } finally { - super.doShutdownOutput(cause); - } - } - - private void shutdownOutput0(final ChannelPromise promise) { - try { - try { - socket.shutdown(false, true); - } finally { - ((AbstractUnsafe) unsafe()).shutdownOutput(); - } - promise.setSuccess(); - } catch (Throwable cause) { - promise.setFailure(cause); - } - } - - private void shutdownInput0(final ChannelPromise promise) { - try { - socket.shutdown(true, false); - promise.setSuccess(); - } catch (Throwable cause) { - promise.setFailure(cause); - } - } - - private void shutdown0(final ChannelPromise promise) { - try { - try { - socket.shutdown(true, true); - } finally { - ((AbstractUnsafe) unsafe()).shutdownOutput(); - } - promise.setSuccess(); - } catch (Throwable cause) { - promise.setFailure(cause); - } + protected final void doShutdownOutput() throws Exception { + socket.shutdown(false, true); } @Override @@ -441,26 +402,16 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - Executor closeExecutor = ((KQueueStreamUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + ((AbstractUnsafe) unsafe()).shutdownOutput(promise); + } else { + loop.execute(new Runnable() { @Override public void run() { - shutdownOutput0(promise); + ((AbstractUnsafe) unsafe()).shutdownOutput(promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdownOutput0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownOutput0(promise); - } - }); - } } return promise; } @@ -472,30 +423,30 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel @Override public ChannelFuture shutdownInput(final ChannelPromise promise) { - Executor closeExecutor = ((KQueueStreamUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownInput0(promise); + } else { + loop.execute(new Runnable() { @Override public void run() { shutdownInput0(promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdownInput0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownInput0(promise); - } - }); - } } return promise; } + private void shutdownInput0(ChannelPromise promise) { + try { + socket.shutdown(true, false); + } catch (Throwable cause) { + promise.setFailure(cause); + return; + } + promise.setSuccess(); + } + @Override public ChannelFuture shutdown() { return shutdown(newPromise()); @@ -503,30 +454,52 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel @Override public ChannelFuture shutdown(final ChannelPromise promise) { - Executor closeExecutor = ((KQueueStreamUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + ChannelFuture shutdownOutputFuture = shutdownOutput(); + if (shutdownOutputFuture.isDone()) { + shutdownOutputDone(shutdownOutputFuture, promise); + } else { + shutdownOutputFuture.addListener(new ChannelFutureListener() { @Override - public void run() { - shutdown0(promise); + public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception { + shutdownOutputDone(shutdownOutputFuture, promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdown0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdown0(promise); - } - }); - } } return promise; } + private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) { + ChannelFuture shutdownInputFuture = shutdownInput(); + if (shutdownInputFuture.isDone()) { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); + } else { + shutdownInputFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); + } + }); + } + } + + private static void shutdownDone(ChannelFuture shutdownOutputFuture, + ChannelFuture shutdownInputFuture, + ChannelPromise promise) { + Throwable shutdownOutputCause = shutdownOutputFuture.cause(); + Throwable shutdownInputCause = shutdownInputFuture.cause(); + if (shutdownOutputCause != null) { + if (shutdownInputCause != null) { + logger.debug("Exception suppressed because a previous exception occurred.", + shutdownInputCause); + } + promise.setFailure(shutdownOutputCause); + } else if (shutdownInputCause != null) { + promise.setFailure(shutdownInputCause); + } else { + promise.setSuccess(); + } + } + class KQueueStreamUnsafe extends AbstractKQueueUnsafe { // Overridden here just to be able to access this method from AbstractKQueueStreamChannel @Override diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 59e730114c..887951a28f 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -397,16 +397,6 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement return false; } - @UnstableApi - @Override - protected void doShutdownOutput(Throwable cause) throws Exception { - // UDP sockets are not connected. A write failure may just be temporary or {@link #doDisconnect()} was called. - ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); - if (channelOutboundBuffer != null) { - channelOutboundBuffer.remove(cause); - } - } - @Override protected void doClose() throws Exception { super.doClose(); diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueChannelConfigTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueChannelConfigTest.java index 2bb08aac63..563e75eee2 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueChannelConfigTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueChannelConfigTest.java @@ -15,10 +15,16 @@ */ package io.netty.channel.kqueue; +import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; import org.junit.Before; import org.junit.Test; +import java.net.InetSocketAddress; + import static org.junit.Assert.fail; public class KQueueChannelConfigTest { @@ -52,4 +58,22 @@ public class KQueueChannelConfigTest { // expected } } + + // See https://github.com/netty/netty/issues/7159 + @Test + public void testSoLingerNoAssertError() throws Exception { + EventLoopGroup group = new KQueueEventLoopGroup(1); + + try { + Bootstrap bootstrap = new Bootstrap(); + KQueueSocketChannel ch = (KQueueSocketChannel) bootstrap.group(group) + .channel(KQueueSocketChannel.class) + .option(ChannelOption.SO_LINGER, 10) + .handler(new ChannelInboundHandlerAdapter()) + .bind(new InetSocketAddress(0)).syncUninterruptibly().channel(); + ch.close().syncUninterruptibly(); + } finally { + group.shutdownGracefully(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index bf248075b2..ad671423df 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -616,8 +616,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes. */ @UnstableApi - public final void shutdownOutput() { - shutdownOutput(null); + public final void shutdownOutput(final ChannelPromise promise) { + assertEventLoop(); + shutdownOutput(promise, null); } /** @@ -625,15 +626,60 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes. * @param cause The cause which may provide rational for the shutdown. */ - final void shutdownOutput(Throwable cause) { + private void shutdownOutput(final ChannelPromise promise, Throwable cause) { + if (!promise.setUncancellable()) { + return; + } + final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { + promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION); return; } this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. - ChannelOutputShutdownException e = new ChannelOutputShutdownException("Channel output shutdown", cause); - outboundBuffer.failFlushed(e, false); - outboundBuffer.close(e, true); + + final Throwable shutdownCause = cause == null ? + new ChannelOutputShutdownException("Channel output shutdown") : + new ChannelOutputShutdownException("Channel output shutdown", cause); + Executor closeExecutor = prepareToClose(); + if (closeExecutor != null) { + closeExecutor.execute(new Runnable() { + @Override + public void run() { + try { + // Execute the shutdown. + doShutdownOutput(); + promise.setSuccess(); + } catch (Throwable err) { + promise.setFailure(err); + } finally { + // Dispatch to the EventLoop + eventLoop().execute(new Runnable() { + @Override + public void run() { + closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause); + } + }); + } + } + }); + } else { + try { + // Execute the shutdown. + doShutdownOutput(); + promise.setSuccess(); + } catch (Throwable err) { + promise.setFailure(err); + } finally { + closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause); + } + } + } + + private void closeOutboundBufferForShutdown( + ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) { + buffer.failFlushed(cause, false); + buffer.close(cause, true); pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE); } @@ -899,7 +945,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { - doShutdownOutput(t); + shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } @@ -1039,11 +1085,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha /** * Called when conditions justify shutting down the output portion of the channel. This may happen if a write * operation throws an exception. - * @param cause The cause for the shutdown. */ @UnstableApi - protected void doShutdownOutput(Throwable cause) throws Exception { - ((AbstractUnsafe) unsafe).shutdownOutput(cause); + protected void doShutdownOutput() throws Exception { + doClose(); } /** diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index ad21e92638..126ce5fe62 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -15,8 +15,6 @@ */ package io.netty.channel; -import io.netty.util.internal.UnstableApi; - import java.net.SocketAddress; /** @@ -60,16 +58,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S throw new UnsupportedOperationException(); } - @UnstableApi - @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - try { - super.doShutdownOutput(cause); - } finally { - close(); - } - } - @Override protected AbstractUnsafe newUnsafe() { return new DefaultServerUnsafe(); diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 9708773bec..4fe1f9d28c 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -696,13 +696,6 @@ public class EmbeddedChannel extends AbstractChannel { state = State.CLOSED; } - @UnstableApi - @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - super.doShutdownOutput(cause); - close(); - } - @Override protected void doBeginRead() throws Exception { // NOOP diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index be1684c616..39cf1a7588 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -32,7 +32,6 @@ import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ThrowableUtil; -import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -292,13 +291,6 @@ public class LocalChannel extends AbstractChannel { } } - @UnstableApi - @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - super.doShutdownOutput(cause); - close(); - } - private void tryClose(boolean isActive) { if (isActive) { unsafe().close(unsafe().voidPromise()); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index c03bd947eb..3d05a46184 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -21,7 +21,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; -import io.netty.util.internal.UnstableApi; import java.io.IOException; import java.net.PortUnreachableException; @@ -155,7 +154,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } break; } - } catch (IOException e) { + } catch (Exception e) { if (continueOnWriteError()) { in.remove(e); } else { @@ -180,13 +179,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { !(this instanceof ServerChannel); } - @UnstableApi - @Override - protected void doShutdownOutput(Throwable cause) throws Exception { - super.doShutdownOutput(cause); - close(); - } - /** * Read messages into the given array and return the amount which was read. */ diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index 4208335a55..0543b83c13 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -19,7 +19,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelPipeline; import io.netty.channel.RecvByteBufAllocator; -import io.netty.util.internal.UnstableApi; import java.io.IOException; import java.util.ArrayList; @@ -104,13 +103,6 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { } } - @UnstableApi - @Override - protected void doShutdownOutput(Throwable cause) throws Exception { - super.doShutdownOutput(cause); - close(); - } - /** * Read messages into the given array and return the amount which was read. */ diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 3b446b81cd..9803bfa2a0 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -235,16 +235,6 @@ public final class NioDatagramChannel javaChannel().close(); } - @UnstableApi - @Override - protected void doShutdownOutput(Throwable cause) throws Exception { - // UDP sockets are not connected. A write failure may just be temporary or doDisconnect() was called. - ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); - if (channelOutboundBuffer != null) { - channelOutboundBuffer.remove(cause); - } - } - @Override protected int doReadMessages(List buf) throws Exception { DatagramChannel ch = javaChannel(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index ed3938b5fd..10fa638efe 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -150,17 +150,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @UnstableApi @Override - protected final void doShutdownOutput(final Throwable cause) throws Exception { - ChannelFuture future = shutdownOutput(); - if (future.isDone()) { - super.doShutdownOutput(cause); + protected final void doShutdownOutput() throws Exception { + if (PlatformDependent.javaVersion() >= 7) { + javaChannel().shutdownOutput(); } else { - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - NioSocketChannel.super.doShutdownOutput(cause); - } - }); + javaChannel().socket().shutdownOutput(); } } @@ -171,26 +165,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + final EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + ((AbstractUnsafe) unsafe()).shutdownOutput(promise); + } else { + loop.execute(new Runnable() { @Override public void run() { - shutdownOutput0(promise); + ((AbstractUnsafe) unsafe()).shutdownOutput(promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdownOutput0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownOutput0(promise); - } - }); - } } return promise; } @@ -207,26 +191,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override public ChannelFuture shutdownInput(final ChannelPromise promise) { - Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownInput0(promise); + } else { + loop.execute(new Runnable() { @Override public void run() { shutdownInput0(promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdownInput0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownInput0(promise); - } - }); - } } return promise; } @@ -238,51 +212,51 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override public ChannelFuture shutdown(final ChannelPromise promise) { - Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); - if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { + ChannelFuture shutdownOutputFuture = shutdownOutput(); + if (shutdownOutputFuture.isDone()) { + shutdownOutputDone(shutdownOutputFuture, promise); + } else { + shutdownOutputFuture.addListener(new ChannelFutureListener() { @Override - public void run() { - shutdown0(promise); + public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception { + shutdownOutputDone(shutdownOutputFuture, promise); } }); - } else { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdown0(promise); - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdown0(promise); - } - }); - } } return promise; } - private void shutdownOutput0(final ChannelPromise promise) { - try { - shutdownOutput0(); - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); + private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) { + ChannelFuture shutdownInputFuture = shutdownInput(); + if (shutdownInputFuture.isDone()) { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); + } else { + shutdownInputFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); + } + }); } } - private void shutdownOutput0() throws Exception { - try { - if (PlatformDependent.javaVersion() >= 7) { - javaChannel().shutdownOutput(); - } else { - javaChannel().socket().shutdownOutput(); + private static void shutdownDone(ChannelFuture shutdownOutputFuture, + ChannelFuture shutdownInputFuture, + ChannelPromise promise) { + Throwable shutdownOutputCause = shutdownOutputFuture.cause(); + Throwable shutdownInputCause = shutdownInputFuture.cause(); + if (shutdownOutputCause != null) { + if (shutdownInputCause != null) { + logger.debug("Exception suppressed because a previous exception occurred.", + shutdownInputCause); } - } finally { - ((AbstractUnsafe) unsafe()).shutdownOutput(); + promise.setFailure(shutdownOutputCause); + } else if (shutdownInputCause != null) { + promise.setFailure(shutdownInputCause); + } else { + promise.setSuccess(); } } - private void shutdownInput0(final ChannelPromise promise) { try { shutdownInput0(); @@ -300,31 +274,6 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } } - private void shutdown0(final ChannelPromise promise) { - Throwable cause = null; - try { - shutdownOutput0(); - } catch (Throwable t) { - cause = t; - } - try { - shutdownInput0(); - } catch (Throwable t) { - if (cause == null) { - promise.setFailure(t); - } else { - logger.debug("Exception suppressed because a previous exception occurred.", t); - promise.setFailure(cause); - } - return; - } - if (cause == null) { - promise.setSuccess(); - } else { - promise.setFailure(cause); - } - } - @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 5763f75572..3e588d5a36 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -32,7 +32,6 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; -import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -199,16 +198,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel socket.disconnect(); } - @UnstableApi - @Override - protected final void doShutdownOutput(Throwable cause) throws Exception { - // UDP sockets are not connected. A write failure may just be temporary or {@link #doDisconnect()} was called. - ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); - if (channelOutboundBuffer != null) { - channelOutboundBuffer.remove(cause); - } - } - @Override protected void doClose() throws Exception { socket.close(); @@ -293,7 +282,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel } socket.send(tmpPacket); in.remove(); - } catch (IOException e) { + } catch (Exception e) { // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 8423230726..0c5580e93c 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; @@ -131,9 +132,8 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan @UnstableApi @Override - protected final void doShutdownOutput(final Throwable cause) throws Exception { - shutdownOutput0(voidPromise()); - super.doShutdownOutput(cause); + protected final void doShutdownOutput() throws Exception { + shutdownOutput0(); } @Override @@ -189,11 +189,7 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan } private void shutdownOutput0() throws IOException { - try { - socket.shutdownOutput(); - } finally { - ((AbstractUnsafe) unsafe()).shutdownOutput(); - } + socket.shutdownOutput(); } @Override @@ -223,42 +219,49 @@ public class OioSocketChannel extends OioByteStreamChannel implements SocketChan @Override public ChannelFuture shutdown(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - shutdown0(promise); + ChannelFuture shutdownOutputFuture = shutdownOutput(); + if (shutdownOutputFuture.isDone()) { + shutdownOutputDone(shutdownOutputFuture, promise); } else { - loop.execute(new Runnable() { + shutdownOutputFuture.addListener(new ChannelFutureListener() { @Override - public void run() { - shutdown0(promise); + public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception { + shutdownOutputDone(shutdownOutputFuture, promise); } }); } return promise; } - private void shutdown0(ChannelPromise promise) { - Throwable cause = null; - try { - shutdownOutput0(); - } catch (Throwable t) { - cause = t; - } - try { - socket.shutdownInput(); - } catch (Throwable t) { - if (cause == null) { - promise.setFailure(t); - } else { - logger.debug("Exception suppressed because a previous exception occurred.", t); - promise.setFailure(cause); - } - return; - } - if (cause == null) { - promise.setSuccess(); + private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) { + ChannelFuture shutdownInputFuture = shutdownInput(); + if (shutdownInputFuture.isDone()) { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); } else { - promise.setFailure(cause); + shutdownInputFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception { + shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); + } + }); + } + } + + private static void shutdownDone(ChannelFuture shutdownOutputFuture, + ChannelFuture shutdownInputFuture, + ChannelPromise promise) { + Throwable shutdownOutputCause = shutdownOutputFuture.cause(); + Throwable shutdownInputCause = shutdownInputFuture.cause(); + if (shutdownOutputCause != null) { + if (shutdownInputCause != null) { + logger.debug("Exception suppressed because a previous exception occurred.", + shutdownInputCause); + } + promise.setFailure(shutdownOutputCause); + } else if (shutdownInputCause != null) { + promise.setFailure(shutdownInputCause); + } else { + promise.setSuccess(); } }