AbstractChannel should call doClose even after shutdownOutput

Motivation:
ShutdownOutput now fails all pending writes in the ChannelOutboundBuffer and sets it to null. However the Close code path uses the ChannelOutboundBuffer as an indication that the close operation is in progress and exits early and will not call doClose. This will lead to the Channel not actually being fully closed.

Bug introduced by 237a4da1b7

Modifications:
- AbstractChannel#close shouldn't exit early just because outboundBuffer is null, and instead should use additional state closeInitiated to avoid duplicate close operations

Result:
AbstractChannel#close(..) after AbstractChannel#shutdownOutbound() will still invoke doClose and cleanup Channel state.
This commit is contained in:
Scott Mitchell 2017-08-24 01:04:06 -07:00 committed by Norman Maurer
parent 1065e0f26e
commit c80fdc8241

View File

@ -66,6 +66,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -629,10 +630,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// Only needed if no VoidChannelPromise.
if (!(promise instanceof VoidChannelPromise)) {
if (closeInitiated) {
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
@ -644,13 +646,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return;
}
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
@ -665,9 +664,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
invokeLater(new Runnable() {
@Override
public void run() {
// Fail all the queued messages
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
if (outboundBuffer != null) {
// Fail all the queued messages
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
@ -679,9 +680,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
invokeLater(new Runnable() {