From 57e7571c65f181df97ea7bbab372659a0baedfa1 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 5 May 2015 10:32:18 +0200 Subject: [PATCH] Not trigger channelWritabilityChanged if fail messages before close Channel. Motivation: We should not trigger channelWritabilityChanged during failing message when we are about to close the Channel as otherwise the use may try again writing even if the Channel is about to get closed. Modifications: Add new boolean param to ChannelOutboundBuffer.failFlushed(...) which allows to specify if we should notify or not. Result: channelWritabilityChanged is not triggered anymore if we cloe the Channel because of an IOException during write. --- .../io/netty/channel/AbstractChannel.java | 15 +++++++------ .../netty/channel/ChannelOutboundBuffer.java | 21 ++++++++++++------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index adb6c2e839..250e4efd54 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -609,7 +609,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void run() { // Fail all the queued messages - buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); + buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); buffer.close(CLOSED_CHANNEL_EXCEPTION); fireChannelInactiveAndDeregister(wasActive); } @@ -623,7 +623,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha doClose0(promise); } finally { // Fail all the queued messages. - buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); + buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); buffer.close(CLOSED_CHANNEL_EXCEPTION); } if (inFlush0) { @@ -784,9 +784,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (!isActive()) { try { if (isOpen()) { - outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); + outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { - outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); + // Do not trigger channelWritabilityChanged because the channel is closed already. + outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; @@ -797,8 +798,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doWrite(outboundBuffer); } catch (Throwable t) { - outboundBuffer.failFlushed(t); - if (t instanceof IOException && config().isAutoClose()) { + boolean close = t instanceof IOException && config().isAutoClose(); + // We do not want to trigger channelWritabilityChanged event if the channel is going to be closed. + outboundBuffer.failFlushed(t, !close); + if (close) { close(voidPromise()); } } finally { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index d959fc2909..fb44342bde 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -149,7 +149,7 @@ public final class ChannelOutboundBuffer { if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); - decrementPendingOutboundBytes(pending, false); + decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); @@ -183,16 +183,17 @@ public final class ChannelOutboundBuffer { * This method is thread-safe! */ void decrementPendingOutboundBytes(long size) { - decrementPendingOutboundBytes(size, true); + decrementPendingOutboundBytes(size, true, true); } - private void decrementPendingOutboundBytes(long size, boolean invokeLater) { + private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); - if (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) { + if (notifyWritability && newWriteBufferSize == 0 + || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } } @@ -257,7 +258,7 @@ public final class ChannelOutboundBuffer { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); - decrementPendingOutboundBytes(size, false); + decrementPendingOutboundBytes(size, false, true); } // recycle the entry @@ -272,6 +273,10 @@ public final class ChannelOutboundBuffer { * {@code false} to signal that no more messages are ready to be handled. */ public boolean remove(Throwable cause) { + return remove0(cause, true); + } + + private boolean remove0(Throwable cause, boolean notifyWritability) { Entry e = flushedEntry; if (e == null) { return false; @@ -288,7 +293,7 @@ public final class ChannelOutboundBuffer { ReferenceCountUtil.safeRelease(msg); safeFail(promise, cause); - decrementPendingOutboundBytes(size, false); + decrementPendingOutboundBytes(size, false, notifyWritability); } // recycle the entry @@ -573,7 +578,7 @@ public final class ChannelOutboundBuffer { return flushed == 0; } - void failFlushed(Throwable cause) { + void failFlushed(Throwable cause, boolean notify) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // indirectly (usually by closing the channel.) @@ -586,7 +591,7 @@ public final class ChannelOutboundBuffer { try { inFail = true; for (;;) { - if (!remove(cause)) { + if (!remove0(cause, notify)) { break; } }