Do not defer closing of Channel when in flush

Motivation:

Previously, we deferred the closing of the Channel when we were flushing. This is problematic as this means that if the user adds a ChannelFutureListener, that will close the Channel, the closing will not happen until we are done with flushing. This can lead to more data is sent than expected.

Modifications:

- Do not defer closing when in flush

Result:

Correctly respect order of events and closing the Channel ASAP
This commit is contained in:
Norman Maurer 2015-05-06 12:35:53 +02:00
parent caac01b0f5
commit 0f8cf690cb

View File

@ -604,24 +604,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return; return;
} }
if (inFlush0) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
close(promise);
}
});
return;
}
if (outboundBuffer == null) { if (outboundBuffer == null) {
// This means close() was called before so we just register a listener and return // Only needed if no VoidChannelPromise.
closeFuture.addListener(new ChannelFutureListener() { if (!(promise instanceof VoidChannelPromise)) {
@Override // This means close() was called before so we just register a listener and return
public void operationComplete(ChannelFuture future) throws Exception { closeFuture.addListener(new ChannelFutureListener() {
promise.setSuccess(); @Override
} public void operationComplete(ChannelFuture future) throws Exception {
}); promise.setSuccess();
}
});
}
return; return;
} }
@ -639,64 +632,72 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
closeExecutor.execute(new OneTimeTask() { closeExecutor.execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
Throwable cause = null;
try { try {
doClose(); // Execute the close.
} catch (Throwable t) { doClose0(promise);
cause = t; } finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new OneTimeTask() {
@Override
public void run() {
// Fail all the queued messages
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
buffer.close(CLOSED_CHANNEL_EXCEPTION);
fireChannelInactiveAndDeregister(wasActive);
}
});
} }
final Throwable error = cause;
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new OneTimeTask() {
@Override
public void run() {
closeAndDeregister(buffer, wasActive, promise, error);
}
});
} }
}); });
} else { } else {
Throwable error = null;
try { try {
doClose(); // Close the channel and fail the queued messages in all cases.
} catch (Throwable t) { doClose0(promise);
error = t; } finally {
// Fail all the queued messages.
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
buffer.close(CLOSED_CHANNEL_EXCEPTION);
}
if (inFlush0) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
} }
closeAndDeregister(buffer, wasActive, promise, error);
} }
} }
private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive, private void doClose0(ChannelPromise promise) {
ChannelPromise promise, Throwable error) {
// Fail all the queued messages
try { try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); doClose();
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
deregister(voidPromise());
}
});
} else {
invokeLater(new OneTimeTask() {
@Override
public void run() {
deregister(voidPromise());
}
});
}
// Now complete the closeFuture and promise.
closeFuture.setClosed(); closeFuture.setClosed();
if (error != null) { safeSetSuccess(promise);
safeSetFailure(promise, error); } catch (Throwable t) {
} else { closeFuture.setClosed();
safeSetSuccess(promise); safeSetFailure(promise, t);
} }
}
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
if (wasActive && !isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
deregister(voidPromise());
}
});
} else {
invokeLater(new OneTimeTask() {
@Override
public void run() {
deregister(voidPromise());
}
});
} }
} }