Ensure close caused by write will happen before write promise is notified
Motiviation: We need to ensure the actual close to the transport takes place before the promsie of the write is notified that triggered it. This is needed as otherwise Channel.isActive(), isOpen() and isWritable() may return true even if the Channel should be closed already. Modifications: - Ensure the close takes place first Result: ChannelFutureListener will see the correct state of the Channel.
This commit is contained in:
parent
7961138f52
commit
da39e601e0
@ -531,10 +531,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void close(final ChannelPromise promise) {
|
public final void close(final ChannelPromise promise) {
|
||||||
|
close(promise, CLOSED_CHANNEL_EXCEPTION, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void close(final ChannelPromise promise, final Throwable cause, final boolean notify) {
|
||||||
if (!promise.setUncancellable()) {
|
if (!promise.setUncancellable()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
|
||||||
if (outboundBuffer == null) {
|
if (outboundBuffer == null) {
|
||||||
// Only needed if no VoidChannelPromise.
|
// Only needed if no VoidChannelPromise.
|
||||||
if (!(promise instanceof VoidChannelPromise)) {
|
if (!(promise instanceof VoidChannelPromise)) {
|
||||||
@ -556,8 +561,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
}
|
}
|
||||||
|
|
||||||
final boolean wasActive = isActive();
|
final boolean wasActive = isActive();
|
||||||
final ChannelOutboundBuffer buffer = outboundBuffer;
|
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
|
||||||
outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
|
|
||||||
Executor closeExecutor = closeExecutor();
|
Executor closeExecutor = closeExecutor();
|
||||||
if (closeExecutor != null) {
|
if (closeExecutor != null) {
|
||||||
closeExecutor.execute(new OneTimeTask() {
|
closeExecutor.execute(new OneTimeTask() {
|
||||||
@ -572,8 +576,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
// Fail all the queued messages
|
// Fail all the queued messages
|
||||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
outboundBuffer.failFlushed(cause, notify);
|
||||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||||
fireChannelInactiveAndDeregister(wasActive);
|
fireChannelInactiveAndDeregister(wasActive);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -586,8 +590,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
doClose0(promise);
|
doClose0(promise);
|
||||||
} finally {
|
} finally {
|
||||||
// Fail all the queued messages.
|
// Fail all the queued messages.
|
||||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
outboundBuffer.failFlushed(cause, notify);
|
||||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||||
}
|
}
|
||||||
if (inFlush0) {
|
if (inFlush0) {
|
||||||
invokeLater(new OneTimeTask() {
|
invokeLater(new OneTimeTask() {
|
||||||
@ -760,11 +764,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
try {
|
try {
|
||||||
doWrite(outboundBuffer);
|
doWrite(outboundBuffer);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
boolean close = t instanceof IOException && config().isAutoClose();
|
if (t instanceof IOException && config().isAutoClose()) {
|
||||||
// We do not want to trigger channelWritabilityChanged event if the channel is going to be closed.
|
/**
|
||||||
outboundBuffer.failFlushed(t, !close);
|
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
|
||||||
if (close) {
|
* failing all flushed messages and also ensure the actual close of the underlying transport
|
||||||
close(voidPromise());
|
* will happen before the promises are notified.
|
||||||
|
*
|
||||||
|
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
|
||||||
|
* may still return {@code true} even if the channel should be closed as result of the exception.
|
||||||
|
*/
|
||||||
|
close(voidPromise(), t, false);
|
||||||
|
} else {
|
||||||
|
outboundBuffer.failFlushed(t, true);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
inFlush0 = false;
|
inFlush0 = false;
|
||||||
|
Loading…
Reference in New Issue
Block a user