Not trigger channelWritabilityChanged if fail messages before close Channel.
Motivation: We should not trigger channelWritabilityChanged during failing message when we are about to close the Channel as otherwise the use may try again writing even if the Channel is about to get closed. Modifications: Add new boolean param to ChannelOutboundBuffer.failFlushed(...) which allows to specify if we should notify or not. Result: channelWritabilityChanged is not triggered anymore if we cloe the Channel because of an IOException during write.
This commit is contained in:
parent
f839f65c15
commit
57e7571c65
@ -609,7 +609,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
@Override
|
||||
public void run() {
|
||||
// Fail all the queued messages
|
||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
}
|
||||
@ -623,7 +623,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
doClose0(promise);
|
||||
} finally {
|
||||
// Fail all the queued messages.
|
||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||
}
|
||||
if (inFlush0) {
|
||||
@ -784,9 +784,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
if (!isActive()) {
|
||||
try {
|
||||
if (isOpen()) {
|
||||
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
|
||||
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
|
||||
} else {
|
||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
||||
// Do not trigger channelWritabilityChanged because the channel is closed already.
|
||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
||||
}
|
||||
} finally {
|
||||
inFlush0 = false;
|
||||
@ -797,8 +798,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
try {
|
||||
doWrite(outboundBuffer);
|
||||
} catch (Throwable t) {
|
||||
outboundBuffer.failFlushed(t);
|
||||
if (t instanceof IOException && config().isAutoClose()) {
|
||||
boolean close = t instanceof IOException && config().isAutoClose();
|
||||
// We do not want to trigger channelWritabilityChanged event if the channel is going to be closed.
|
||||
outboundBuffer.failFlushed(t, !close);
|
||||
if (close) {
|
||||
close(voidPromise());
|
||||
}
|
||||
} finally {
|
||||
|
@ -149,7 +149,7 @@ public final class ChannelOutboundBuffer {
|
||||
if (!entry.promise.setUncancellable()) {
|
||||
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
||||
int pending = entry.cancel();
|
||||
decrementPendingOutboundBytes(pending, false);
|
||||
decrementPendingOutboundBytes(pending, false, true);
|
||||
}
|
||||
entry = entry.next;
|
||||
} while (entry != null);
|
||||
@ -183,16 +183,17 @@ public final class ChannelOutboundBuffer {
|
||||
* This method is thread-safe!
|
||||
*/
|
||||
void decrementPendingOutboundBytes(long size) {
|
||||
decrementPendingOutboundBytes(size, true);
|
||||
decrementPendingOutboundBytes(size, true, true);
|
||||
}
|
||||
|
||||
private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
|
||||
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
|
||||
if (size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
|
||||
if (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
|
||||
if (notifyWritability && newWriteBufferSize == 0
|
||||
|| newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
|
||||
setWritable(invokeLater);
|
||||
}
|
||||
}
|
||||
@ -257,7 +258,7 @@ public final class ChannelOutboundBuffer {
|
||||
// only release message, notify and decrement if it was not canceled before.
|
||||
ReferenceCountUtil.safeRelease(msg);
|
||||
safeSuccess(promise);
|
||||
decrementPendingOutboundBytes(size, false);
|
||||
decrementPendingOutboundBytes(size, false, true);
|
||||
}
|
||||
|
||||
// recycle the entry
|
||||
@ -272,6 +273,10 @@ public final class ChannelOutboundBuffer {
|
||||
* {@code false} to signal that no more messages are ready to be handled.
|
||||
*/
|
||||
public boolean remove(Throwable cause) {
|
||||
return remove0(cause, true);
|
||||
}
|
||||
|
||||
private boolean remove0(Throwable cause, boolean notifyWritability) {
|
||||
Entry e = flushedEntry;
|
||||
if (e == null) {
|
||||
return false;
|
||||
@ -288,7 +293,7 @@ public final class ChannelOutboundBuffer {
|
||||
ReferenceCountUtil.safeRelease(msg);
|
||||
|
||||
safeFail(promise, cause);
|
||||
decrementPendingOutboundBytes(size, false);
|
||||
decrementPendingOutboundBytes(size, false, notifyWritability);
|
||||
}
|
||||
|
||||
// recycle the entry
|
||||
@ -573,7 +578,7 @@ public final class ChannelOutboundBuffer {
|
||||
return flushed == 0;
|
||||
}
|
||||
|
||||
void failFlushed(Throwable cause) {
|
||||
void failFlushed(Throwable cause, boolean notify) {
|
||||
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the
|
||||
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
|
||||
// indirectly (usually by closing the channel.)
|
||||
@ -586,7 +591,7 @@ public final class ChannelOutboundBuffer {
|
||||
try {
|
||||
inFail = true;
|
||||
for (;;) {
|
||||
if (!remove(cause)) {
|
||||
if (!remove0(cause, notify)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user