Merge ChannelOutboundBuffer.failUnflushed() and recycle() into a single method and make sure it is run later on reentrance

- Previously, failUnflushed() did not run when inFail is true, which made unflushed writes are not released on reentrance.   This has been fixed by this commit.
- Also, AbstractUnsafe.outboundBuffer is set to null as early as possible to remove the chance of any write attempts made after the closure.
This commit is contained in:
Trustin Lee 2013-07-23 14:33:37 +09:00
parent f4e128b807
commit a89b17fa94
2 changed files with 50 additions and 46 deletions

View File

@ -516,6 +516,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
boolean wasActive = isActive(); boolean wasActive = isActive();
if (closeFuture.setClosed()) { if (closeFuture.setClosed()) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
try { try {
doClose(); doClose();
promise.setSuccess(); promise.setSuccess();
@ -523,14 +526,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
promise.setFailure(t); promise.setFailure(t);
} }
// fail all queued messages // Fail all the queued messages
try { try {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.failUnflushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.recycle();
} finally { } finally {
outboundBuffer = null;
if (wasActive && !isActive()) { if (wasActive && !isActive()) {
invokeLater(new Runnable() { invokeLater(new Runnable() {

View File

@ -29,6 +29,7 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@ -128,20 +129,6 @@ public final class ChannelOutboundBuffer {
unflushedTotals = new long[initialCapacity]; unflushedTotals = new long[initialCapacity];
} }
void recycle() {
if (head != tail) {
throw new IllegalStateException();
}
if (unflushedCount != 0) {
throw new IllegalStateException();
}
if (totalPendingSize != 0) {
throw new IllegalStateException();
}
RECYCLER.recycle(this, handle);
}
void addMessage(Object msg, ChannelPromise promise) { void addMessage(Object msg, ChannelPromise promise) {
Object[] unflushed = this.unflushed; Object[] unflushed = this.unflushed;
int unflushedCount = this.unflushedCount; int unflushedCount = this.unflushedCount;
@ -482,33 +469,6 @@ public final class ChannelOutboundBuffer {
return head == tail; return head == tail;
} }
void failUnflushed(Throwable cause) {
if (inFail) {
return;
}
inFail = true;
// Release all unflushed messages.
Object[] unflushed = this.unflushed;
ChannelPromise[] unflushedPromises = this.unflushedPromises;
int[] unflushedPendingSizes = this.unflushedPendingSizes;
final int unflushedCount = this.unflushedCount;
try {
for (int i = 0; i < unflushedCount; i++) {
safeRelease(unflushed[i]);
unflushed[i] = null;
safeFail(unflushedPromises[i], cause);
unflushedPromises[i] = null;
decrementPendingOutboundBytes(unflushedPendingSizes[i]);
unflushedPendingSizes[i] = 0;
}
} finally {
this.unflushedCount = 0;
inFail = false;
}
}
void failFlushed(Throwable cause) { void failFlushed(Throwable cause) {
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the // Make sure that this method does not reenter. A listener added to the current promise can be notified by the
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
@ -531,6 +491,50 @@ public final class ChannelOutboundBuffer {
} }
} }
void close(final ClosedChannelException cause) {
if (inFail) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
close(cause);
}
});
return;
}
inFail = true;
if (channel.isOpen()) {
throw new IllegalStateException("close() must be invoked after the channel is closed.");
}
if (head != tail) {
throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
}
// Release all unflushed messages.
Object[] unflushed = this.unflushed;
ChannelPromise[] unflushedPromises = this.unflushedPromises;
int[] unflushedPendingSizes = this.unflushedPendingSizes;
final int unflushedCount = this.unflushedCount;
try {
for (int i = 0; i < unflushedCount; i++) {
safeRelease(unflushed[i]);
unflushed[i] = null;
safeFail(unflushedPromises[i], cause);
unflushedPromises[i] = null;
// Just decrease; do not trigger any events via decrementPendingOutboundBytes()
totalPendingSize -= unflushedPendingSizes[i];
unflushedPendingSizes[i] = 0;
}
} finally {
this.unflushedCount = 0;
inFail = false;
}
RECYCLER.recycle(this, handle);
}
private static void safeRelease(Object message) { private static void safeRelease(Object message) {
try { try {
ReferenceCountUtil.release(message); ReferenceCountUtil.release(message);