diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 3449402ab8..7dd9a6e996 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -584,10 +584,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public final void close(final ChannelPromise promise) { + close(promise, CLOSED_CHANNEL_EXCEPTION, false); + } + + private void close(final ChannelPromise promise, final Throwable cause, final boolean notify) { if (!promise.setUncancellable()) { return; } + final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // Only needed if no VoidChannelPromise. if (!(promise instanceof VoidChannelPromise)) { @@ -609,8 +614,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } final boolean wasActive = isActive(); - final ChannelOutboundBuffer buffer = outboundBuffer; - outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. + this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = closeExecutor(); if (closeExecutor != null) { closeExecutor.execute(new OneTimeTask() { @@ -625,8 +629,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void run() { // Fail all the queued messages - buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); - buffer.close(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.failFlushed(cause, notify); + outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); fireChannelInactiveAndDeregister(wasActive); } }); @@ -639,8 +643,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha doClose0(promise); } finally { // Fail all the queued messages. - buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); - buffer.close(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.failFlushed(cause, notify); + outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); } if (inFlush0) { invokeLater(new OneTimeTask() { @@ -814,11 +818,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doWrite(outboundBuffer); } catch (Throwable t) { - boolean close = t instanceof IOException && config().isAutoClose(); - // We do not want to trigger channelWritabilityChanged event if the channel is going to be closed. - outboundBuffer.failFlushed(t, !close); - if (close) { - close(voidPromise()); + if (t instanceof IOException && config().isAutoClose()) { + /** + * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of + * failing all flushed messages and also ensure the actual close of the underlying transport + * will happen before the promises are notified. + * + * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} + * may still return {@code true} even if the channel should be closed as result of the exception. + */ + close(voidPromise(), t, false); + } else { + outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false;