Make sure all write attempts made after a channel is closed are marked as failure

This commit is contained in:
Trustin Lee 2013-07-17 21:14:07 +09:00
parent a8d67b0282
commit cc9b9567b9

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder; import io.netty.buffer.ByteBufHolder;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.ThreadLocalRandom;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -38,6 +39,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();
static {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
private final Channel parent; private final Channel parent;
private final long hashCode = ThreadLocalRandom.current().nextLong(); private final long hashCode = ThreadLocalRandom.current().nextLong();
private final Unsafe unsafe; private final Unsafe unsafe;
@ -53,7 +62,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile EventLoop eventLoop; private volatile EventLoop eventLoop;
private volatile boolean registered; private volatile boolean registered;
private ClosedChannelException closedChannelException;
private boolean inFlush0; private boolean inFlush0;
/** Cache for the string representation of this channel */ /** Cache for the string representation of this channel */
@ -500,10 +508,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (!outboundBuffer.isEmpty()) { if (!outboundBuffer.isEmpty()) {
// fail all queued messages // fail all queued messages
if (closedChannelException == null) { outboundBuffer.fail(CLOSED_CHANNEL_EXCEPTION);
closedChannelException = new ClosedChannelException();
}
outboundBuffer.fail(closedChannelException);
} }
outboundBuffer.clearUnflushed(); outboundBuffer.clearUnflushed();
@ -589,8 +594,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public void write(Object msg, ChannelPromise promise) { public void write(Object msg, ChannelPromise promise) {
if (!isActive()) {
// Mark the write request as failure if the channl is inactive.
if (isOpen()) {
promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
} else {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
}
} else {
outboundBuffer.addMessage(msg, promise); outboundBuffer.addMessage(msg, promise);
} }
}
@Override @Override
public void flush() { public void flush() {
@ -611,9 +625,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// Mark all pending write requests as failure if the channel is inactive. // Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) { if (!isActive()) {
if (isOpen()) { if (isOpen()) {
outboundBuffer.fail(new NotYetConnectedException()); outboundBuffer.fail(NOT_YET_CONNECTED_EXCEPTION);
} else { } else {
outboundBuffer.fail(new ClosedChannelException()); outboundBuffer.fail(CLOSED_CHANNEL_EXCEPTION);
} }
inFlush0 = false; inFlush0 = false;
return; return;
@ -675,8 +689,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return true; return true;
} }
Exception e = new ClosedChannelException(); promise.setFailure(CLOSED_CHANNEL_EXCEPTION);
promise.setFailure(e);
return false; return false;
} }