diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 5e26506522..9df4eb1558 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -212,18 +212,25 @@ final class ChannelOutboundBuffer { } void clearUnflushed(Throwable cause) { + if (inFail) { + return; + } + MessageList unflushed = unflushedMessageList; if (unflushed == null) { return; } + inFail = true; + // Release all unflushed messages. Object[] messages = unflushed.messages(); ChannelPromise[] promises = unflushed.promises(); final int size = unflushed.size(); try { for (int i = 0; i < size; i++) { - ReferenceCountUtil.release(messages[i]); + safeRelease(messages[i]); + ChannelPromise p = promises[i]; if (!(p instanceof VoidChannelPromise)) { if (!p.tryFailure(cause)) { @@ -235,6 +242,7 @@ final class ChannelOutboundBuffer { unflushed.recycle(); decrementPendingOutboundBytes(unflushedMessageListSize); unflushedMessageListSize = 0; + inFail = false; } } @@ -271,7 +279,8 @@ final class ChannelOutboundBuffer { final int size = current.size(); try { for (int i = currentMessageIndex; i < size; i++) { - ReferenceCountUtil.release(messages[i]); + safeRelease(messages[i]); + ChannelPromise p = promises[i]; if (!(p instanceof VoidChannelPromise) && !p.tryFailure(cause)) { logger.warn("Promise done already: {} - new exception is:", p, cause); @@ -286,4 +295,12 @@ final class ChannelOutboundBuffer { inFail = false; } } + + private static void safeRelease(Object message) { + try { + ReferenceCountUtil.release(message); + } catch (Throwable t) { + logger.warn("Failed to release a message.", t); + } + } }