Do not defer closing of Channel when in flush

Motivation:

Previously, we deferred the closing of the Channel when we were flushing. This is problematic as this means that if the user adds a ChannelFutureListener, that will close the Channel, the closing will not happen until we are done with flushing. This can lead to more data is sent than expected.

Modifications:

- Do not defer closing when in flush

Result:

Correctly respect order of events and closing the Channel ASAP
This commit is contained in:
Norman Maurer 2015-05-06 12:35:53 +02:00
parent 0ac14b3d3b
commit f839f65c15

View File

@ -572,17 +572,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return;
}
if (inFlush0) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
close(promise);
}
});
return;
}
if (outboundBuffer == null) {
// Only needed if no VoidChannelPromise.
if (!(promise instanceof VoidChannelPromise)) {
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
@ -590,6 +582,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
promise.setSuccess();
}
});
}
return;
}
@ -607,19 +600,42 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new OneTimeTask() {
@Override
public void run() {
closeAndDeregister(buffer, wasActive);
// Fail all the queued messages
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
buffer.close(CLOSED_CHANNEL_EXCEPTION);
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
closeAndDeregister(buffer, wasActive);
} finally {
// Fail all the queued messages.
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
buffer.close(CLOSED_CHANNEL_EXCEPTION);
}
if (inFlush0) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
@ -634,12 +650,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive) {
// Fail all the queued messages
try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
if (wasActive && !isActive()) {
invokeLater(new OneTimeTask() {
@Override
@ -651,7 +662,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
deregister(voidPromise());
}
}
@Override
public final void closeForcibly() {