From da39e601e0ab59c268c9f8328afde06cd69600ab Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 15 Sep 2015 20:47:24 +0200 Subject: [PATCH] Ensure close caused by write will happen before write promise is notified Motiviation: We need to ensure the actual close to the transport takes place before the promsie of the write is notified that triggered it. This is needed as otherwise Channel.isActive(), isOpen() and isWritable() may return true even if the Channel should be closed already. Modifications: - Ensure the close takes place first Result: ChannelFutureListener will see the correct state of the Channel. --- .../io/netty/channel/AbstractChannel.java | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 4981d65f39..4ce044958d 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -531,10 +531,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public final void close(final ChannelPromise promise) { + close(promise, CLOSED_CHANNEL_EXCEPTION, false); + } + + private void close(final ChannelPromise promise, final Throwable cause, final boolean notify) { if (!promise.setUncancellable()) { return; } + final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // Only needed if no VoidChannelPromise. if (!(promise instanceof VoidChannelPromise)) { @@ -556,8 +561,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } final boolean wasActive = isActive(); - final ChannelOutboundBuffer buffer = outboundBuffer; - outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. + this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = closeExecutor(); if (closeExecutor != null) { closeExecutor.execute(new OneTimeTask() { @@ -572,8 +576,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void run() { // Fail all the queued messages - buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); - buffer.close(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.failFlushed(cause, notify); + outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); fireChannelInactiveAndDeregister(wasActive); } }); @@ -586,8 +590,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha doClose0(promise); } finally { // Fail all the queued messages. - buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); - buffer.close(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.failFlushed(cause, notify); + outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); } if (inFlush0) { invokeLater(new OneTimeTask() { @@ -760,11 +764,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doWrite(outboundBuffer); } catch (Throwable t) { - boolean close = t instanceof IOException && config().isAutoClose(); - // We do not want to trigger channelWritabilityChanged event if the channel is going to be closed. - outboundBuffer.failFlushed(t, !close); - if (close) { - close(voidPromise()); + if (t instanceof IOException && config().isAutoClose()) { + /** + * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of + * failing all flushed messages and also ensure the actual close of the underlying transport + * will happen before the promises are notified. + * + * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} + * may still return {@code true} even if the channel should be closed as result of the exception. + */ + close(voidPromise(), t, false); + } else { + outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false;