From a89b17fa944504a00107d40ecf6043de197f34df Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 23 Jul 2013 14:33:37 +0900 Subject: [PATCH] Merge ChannelOutboundBuffer.failUnflushed() and recycle() into a single method and make sure it is run later on reentrance - Previously, failUnflushed() did not run when inFail is true, which made unflushed writes are not released on reentrance. This has been fixed by this commit. - Also, AbstractUnsafe.outboundBuffer is set to null as early as possible to remove the chance of any write attempts made after the closure. --- .../io/netty/channel/AbstractChannel.java | 10 +-- .../netty/channel/ChannelOutboundBuffer.java | 86 ++++++++++--------- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 74fb5aa599..4f544bc67a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -516,6 +516,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha boolean wasActive = isActive(); if (closeFuture.setClosed()) { + ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; + this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. + try { doClose(); promise.setSuccess(); @@ -523,14 +526,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha promise.setFailure(t); } - // fail all queued messages + // Fail all the queued messages try { - ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); - outboundBuffer.failUnflushed(CLOSED_CHANNEL_EXCEPTION); - outboundBuffer.recycle(); + outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); } finally { - outboundBuffer = null; if (wasActive && !isActive()) { invokeLater(new Runnable() { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index daf411808a..bdec172a9e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -29,6 +29,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -128,20 +129,6 @@ public final class ChannelOutboundBuffer { unflushedTotals = new long[initialCapacity]; } - void recycle() { - if (head != tail) { - throw new IllegalStateException(); - } - if (unflushedCount != 0) { - throw new IllegalStateException(); - } - if (totalPendingSize != 0) { - throw new IllegalStateException(); - } - - RECYCLER.recycle(this, handle); - } - void addMessage(Object msg, ChannelPromise promise) { Object[] unflushed = this.unflushed; int unflushedCount = this.unflushedCount; @@ -482,33 +469,6 @@ public final class ChannelOutboundBuffer { return head == tail; } - void failUnflushed(Throwable cause) { - if (inFail) { - return; - } - - inFail = true; - - // Release all unflushed messages. - Object[] unflushed = this.unflushed; - ChannelPromise[] unflushedPromises = this.unflushedPromises; - int[] unflushedPendingSizes = this.unflushedPendingSizes; - final int unflushedCount = this.unflushedCount; - try { - for (int i = 0; i < unflushedCount; i++) { - safeRelease(unflushed[i]); - unflushed[i] = null; - safeFail(unflushedPromises[i], cause); - unflushedPromises[i] = null; - decrementPendingOutboundBytes(unflushedPendingSizes[i]); - unflushedPendingSizes[i] = 0; - } - } finally { - this.unflushedCount = 0; - inFail = false; - } - } - void failFlushed(Throwable cause) { // Make sure that this method does not reenter. A listener added to the current promise can be notified by the // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call @@ -531,6 +491,50 @@ public final class ChannelOutboundBuffer { } } + void close(final ClosedChannelException cause) { + if (inFail) { + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + close(cause); + } + }); + return; + } + + inFail = true; + + if (channel.isOpen()) { + throw new IllegalStateException("close() must be invoked after the channel is closed."); + } + + if (head != tail) { + throw new IllegalStateException("close() must be invoked after all flushed writes are handled."); + } + + // Release all unflushed messages. + Object[] unflushed = this.unflushed; + ChannelPromise[] unflushedPromises = this.unflushedPromises; + int[] unflushedPendingSizes = this.unflushedPendingSizes; + final int unflushedCount = this.unflushedCount; + try { + for (int i = 0; i < unflushedCount; i++) { + safeRelease(unflushed[i]); + unflushed[i] = null; + safeFail(unflushedPromises[i], cause); + unflushedPromises[i] = null; + // Just decrease; do not trigger any events via decrementPendingOutboundBytes() + totalPendingSize -= unflushedPendingSizes[i]; + unflushedPendingSizes[i] = 0; + } + } finally { + this.unflushedCount = 0; + inFail = false; + } + + RECYCLER.recycle(this, handle); + } + private static void safeRelease(Object message) { try { ReferenceCountUtil.release(message);