Do not defer closing of Channel when in flush
Motivation: Previously, we deferred the closing of the Channel when we were flushing. This is problematic as this means that if the user adds a ChannelFutureListener, that will close the Channel, the closing will not happen until we are done with flushing. This can lead to more data is sent than expected. Modifications: - Do not defer closing when in flush Result: Correctly respect order of events and closing the Channel ASAP
This commit is contained in:
parent
0d780601c9
commit
275e2f0b36
@ -535,24 +535,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
return;
|
||||
}
|
||||
|
||||
if (inFlush0) {
|
||||
invokeLater(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
close(promise);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (outboundBuffer == null) {
|
||||
// This means close() was called before so we just register a listener and return
|
||||
closeFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
promise.setSuccess();
|
||||
}
|
||||
});
|
||||
// Only needed if no VoidChannelPromise.
|
||||
if (!(promise instanceof VoidChannelPromise)) {
|
||||
// This means close() was called before so we just register a listener and return
|
||||
closeFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
promise.setSuccess();
|
||||
}
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -570,19 +563,42 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
closeExecutor.execute(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
doClose0(promise);
|
||||
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
|
||||
invokeLater(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
closeAndDeregister(buffer, wasActive);
|
||||
}
|
||||
});
|
||||
try {
|
||||
// Execute the close.
|
||||
doClose0(promise);
|
||||
} finally {
|
||||
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
|
||||
invokeLater(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Fail all the queued messages
|
||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
doClose0(promise);
|
||||
closeAndDeregister(buffer, wasActive);
|
||||
try {
|
||||
// Close the channel and fail the queued messages in all cases.
|
||||
doClose0(promise);
|
||||
} finally {
|
||||
// Fail all the queued messages.
|
||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||
}
|
||||
if (inFlush0) {
|
||||
invokeLater(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -597,23 +613,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
}
|
||||
}
|
||||
|
||||
private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive) {
|
||||
// Fail all the queued messages
|
||||
try {
|
||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||
} finally {
|
||||
if (wasActive && !isActive()) {
|
||||
invokeLater(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireChannelInactive();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
deregister(voidPromise());
|
||||
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
|
||||
if (wasActive && !isActive()) {
|
||||
invokeLater(new OneTimeTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireChannelInactive();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
deregister(voidPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user