Not trigger channelWritabilityChanged if fail messages before close Channel.
Motivation: We should not trigger channelWritabilityChanged during failing message when we are about to close the Channel as otherwise the use may try again writing even if the Channel is about to get closed. Modifications: Add new boolean param to ChannelOutboundBuffer.failFlushed(...) which allows to specify if we should notify or not. Result: channelWritabilityChanged is not triggered anymore if we cloe the Channel because of an IOException during write.
This commit is contained in:
parent
275e2f0b36
commit
0d792c3aa0
@ -572,7 +572,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
// Fail all the queued messages
|
// Fail all the queued messages
|
||||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
||||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||||
fireChannelInactiveAndDeregister(wasActive);
|
fireChannelInactiveAndDeregister(wasActive);
|
||||||
}
|
}
|
||||||
@ -586,7 +586,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
doClose0(promise);
|
doClose0(promise);
|
||||||
} finally {
|
} finally {
|
||||||
// Fail all the queued messages.
|
// Fail all the queued messages.
|
||||||
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
buffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
||||||
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
buffer.close(CLOSED_CHANNEL_EXCEPTION);
|
||||||
}
|
}
|
||||||
if (inFlush0) {
|
if (inFlush0) {
|
||||||
@ -746,9 +746,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
if (!isActive()) {
|
if (!isActive()) {
|
||||||
try {
|
try {
|
||||||
if (isOpen()) {
|
if (isOpen()) {
|
||||||
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
|
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
|
||||||
} else {
|
} else {
|
||||||
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
|
// Do not trigger channelWritabilityChanged because the channel is closed already.
|
||||||
|
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
inFlush0 = false;
|
inFlush0 = false;
|
||||||
@ -759,8 +760,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
try {
|
try {
|
||||||
doWrite(outboundBuffer);
|
doWrite(outboundBuffer);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
outboundBuffer.failFlushed(t);
|
boolean close = t instanceof IOException && config().isAutoClose();
|
||||||
if (t instanceof IOException && config().isAutoClose()) {
|
// We do not want to trigger channelWritabilityChanged event if the channel is going to be closed.
|
||||||
|
outboundBuffer.failFlushed(t, !close);
|
||||||
|
if (close) {
|
||||||
close(voidPromise());
|
close(voidPromise());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -149,7 +149,7 @@ public final class ChannelOutboundBuffer {
|
|||||||
if (!entry.promise.setUncancellable()) {
|
if (!entry.promise.setUncancellable()) {
|
||||||
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
||||||
int pending = entry.cancel();
|
int pending = entry.cancel();
|
||||||
decrementPendingOutboundBytes(pending, false);
|
decrementPendingOutboundBytes(pending, false, true);
|
||||||
}
|
}
|
||||||
entry = entry.next;
|
entry = entry.next;
|
||||||
} while (entry != null);
|
} while (entry != null);
|
||||||
@ -183,16 +183,17 @@ public final class ChannelOutboundBuffer {
|
|||||||
* This method is thread-safe!
|
* This method is thread-safe!
|
||||||
*/
|
*/
|
||||||
void decrementPendingOutboundBytes(long size) {
|
void decrementPendingOutboundBytes(long size) {
|
||||||
decrementPendingOutboundBytes(size, true);
|
decrementPendingOutboundBytes(size, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
|
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
|
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
|
||||||
if (newWriteBufferSize == 0 || newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
|
if (notifyWritability && newWriteBufferSize == 0
|
||||||
|
|| newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark()) {
|
||||||
setWritable(invokeLater);
|
setWritable(invokeLater);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -257,7 +258,7 @@ public final class ChannelOutboundBuffer {
|
|||||||
// only release message, notify and decrement if it was not canceled before.
|
// only release message, notify and decrement if it was not canceled before.
|
||||||
ReferenceCountUtil.safeRelease(msg);
|
ReferenceCountUtil.safeRelease(msg);
|
||||||
safeSuccess(promise);
|
safeSuccess(promise);
|
||||||
decrementPendingOutboundBytes(size, false);
|
decrementPendingOutboundBytes(size, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// recycle the entry
|
// recycle the entry
|
||||||
@ -272,6 +273,10 @@ public final class ChannelOutboundBuffer {
|
|||||||
* {@code false} to signal that no more messages are ready to be handled.
|
* {@code false} to signal that no more messages are ready to be handled.
|
||||||
*/
|
*/
|
||||||
public boolean remove(Throwable cause) {
|
public boolean remove(Throwable cause) {
|
||||||
|
return remove0(cause, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean remove0(Throwable cause, boolean notifyWritability) {
|
||||||
Entry e = flushedEntry;
|
Entry e = flushedEntry;
|
||||||
if (e == null) {
|
if (e == null) {
|
||||||
return false;
|
return false;
|
||||||
@ -288,7 +293,7 @@ public final class ChannelOutboundBuffer {
|
|||||||
ReferenceCountUtil.safeRelease(msg);
|
ReferenceCountUtil.safeRelease(msg);
|
||||||
|
|
||||||
safeFail(promise, cause);
|
safeFail(promise, cause);
|
||||||
decrementPendingOutboundBytes(size, false);
|
decrementPendingOutboundBytes(size, false, notifyWritability);
|
||||||
}
|
}
|
||||||
|
|
||||||
// recycle the entry
|
// recycle the entry
|
||||||
@ -573,7 +578,7 @@ public final class ChannelOutboundBuffer {
|
|||||||
return flushed == 0;
|
return flushed == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void failFlushed(Throwable cause) {
|
void failFlushed(Throwable cause, boolean notify) {
|
||||||
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the
|
// Make sure that this method does not reenter. A listener added to the current promise can be notified by the
|
||||||
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
|
// current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
|
||||||
// indirectly (usually by closing the channel.)
|
// indirectly (usually by closing the channel.)
|
||||||
@ -586,7 +591,7 @@ public final class ChannelOutboundBuffer {
|
|||||||
try {
|
try {
|
||||||
inFail = true;
|
inFail = true;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (!remove(cause)) {
|
if (!remove0(cause, notify)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user