Make the cleanup logic in ChannelOutboundBuffer more robust
- Fixes #1601
This commit is contained in:
parent
9c8d980a74
commit
e28594952b
@ -212,18 +212,25 @@ final class ChannelOutboundBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void clearUnflushed(Throwable cause) {
|
void clearUnflushed(Throwable cause) {
|
||||||
|
if (inFail) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
MessageList unflushed = unflushedMessageList;
|
MessageList unflushed = unflushedMessageList;
|
||||||
if (unflushed == null) {
|
if (unflushed == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inFail = true;
|
||||||
|
|
||||||
// Release all unflushed messages.
|
// Release all unflushed messages.
|
||||||
Object[] messages = unflushed.messages();
|
Object[] messages = unflushed.messages();
|
||||||
ChannelPromise[] promises = unflushed.promises();
|
ChannelPromise[] promises = unflushed.promises();
|
||||||
final int size = unflushed.size();
|
final int size = unflushed.size();
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
ReferenceCountUtil.release(messages[i]);
|
safeRelease(messages[i]);
|
||||||
|
|
||||||
ChannelPromise p = promises[i];
|
ChannelPromise p = promises[i];
|
||||||
if (!(p instanceof VoidChannelPromise)) {
|
if (!(p instanceof VoidChannelPromise)) {
|
||||||
if (!p.tryFailure(cause)) {
|
if (!p.tryFailure(cause)) {
|
||||||
@ -235,6 +242,7 @@ final class ChannelOutboundBuffer {
|
|||||||
unflushed.recycle();
|
unflushed.recycle();
|
||||||
decrementPendingOutboundBytes(unflushedMessageListSize);
|
decrementPendingOutboundBytes(unflushedMessageListSize);
|
||||||
unflushedMessageListSize = 0;
|
unflushedMessageListSize = 0;
|
||||||
|
inFail = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -271,7 +279,8 @@ final class ChannelOutboundBuffer {
|
|||||||
final int size = current.size();
|
final int size = current.size();
|
||||||
try {
|
try {
|
||||||
for (int i = currentMessageIndex; i < size; i++) {
|
for (int i = currentMessageIndex; i < size; i++) {
|
||||||
ReferenceCountUtil.release(messages[i]);
|
safeRelease(messages[i]);
|
||||||
|
|
||||||
ChannelPromise p = promises[i];
|
ChannelPromise p = promises[i];
|
||||||
if (!(p instanceof VoidChannelPromise) && !p.tryFailure(cause)) {
|
if (!(p instanceof VoidChannelPromise) && !p.tryFailure(cause)) {
|
||||||
logger.warn("Promise done already: {} - new exception is:", p, cause);
|
logger.warn("Promise done already: {} - new exception is:", p, cause);
|
||||||
@ -286,4 +295,12 @@ final class ChannelOutboundBuffer {
|
|||||||
inFail = false;
|
inFail = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void safeRelease(Object message) {
|
||||||
|
try {
|
||||||
|
ReferenceCountUtil.release(message);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("Failed to release a message.", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user