From a8d67b02825d6fbb1d145c7aaee8382f5f648035 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 17 Jul 2013 21:02:20 +0900 Subject: [PATCH] Fix data structure corruption and resource leak in ChannelOutboundBuffer --- .../io/netty/channel/AbstractChannel.java | 17 +-- .../netty/channel/ChannelOutboundBuffer.java | 108 +++++++++++------- 2 files changed, 76 insertions(+), 49 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 3a5f528d4f..d0cdec0342 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -498,15 +498,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha promise.setFailure(t); } - if (closedChannelException == null) { - closedChannelException = new ClosedChannelException(); - } - - // fail all queued messages - if (outboundBuffer.next()) { + if (!outboundBuffer.isEmpty()) { + // fail all queued messages + if (closedChannelException == null) { + closedChannelException = new ClosedChannelException(); + } outboundBuffer.fail(closedChannelException); } + outboundBuffer.clearUnflushed(); + if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override @@ -620,12 +621,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { for (;;) { - MessageList messages = outboundBuffer.currentMessages; + MessageList messages = outboundBuffer.currentMessageList; if (messages == null) { if (!outboundBuffer.next()) { break; } - messages = outboundBuffer.currentMessages; + messages = outboundBuffer.currentMessageList; } int messageIndex = outboundBuffer.currentMessageIndex; diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 017d072b3c..175f554187 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -31,13 +31,16 @@ final class ChannelOutboundBuffer { private static final int MIN_INITIAL_CAPACITY = 8; - MessageList currentMessages; + MessageList currentMessageList; int currentMessageIndex; private long currentMessageListSize; - private MessageList[] messages; + private MessageList[] messageLists; private long[] messageListSizes; + private MessageList unflushedMessageList; + private long unflushedMessageListSize; + private int head; private int tail; private boolean inFail; @@ -77,32 +80,39 @@ final class ChannelOutboundBuffer { initialCapacity = MIN_INITIAL_CAPACITY; } - messages = new MessageList[initialCapacity]; - messageListSizes = new long[initialCapacity]; this.channel = channel; + + messageLists = new MessageList[initialCapacity]; + messageListSizes = new long[initialCapacity]; } void addMessage(Object msg, ChannelPromise promise) { - int tail = this.tail; - MessageList msgs = messages[tail]; - if (msgs == null) { - messages[tail] = msgs = MessageList.newInstance(); + MessageList unflushedMessageList = this.unflushedMessageList; + if (unflushedMessageList == null) { + this.unflushedMessageList = unflushedMessageList = MessageList.newInstance(); } - msgs.add(msg, promise); + unflushedMessageList.add(msg, promise); int size = channel.calculateMessageSize(msg); - messageListSizes[tail] += size; + unflushedMessageListSize += size; incrementPendingOutboundBytes(size); } void addFlush() { - int tail = this.tail; - if (messages[tail] == null) { + MessageList unflushedMessageList = this.unflushedMessageList; + if (unflushedMessageList == null) { return; } - if ((this.tail = tail + 1 & messages.length - 1) == head) { + int tail = this.tail; + + messageLists[tail] = unflushedMessageList; + messageListSizes[tail] = unflushedMessageListSize; + this.unflushedMessageList = null; + unflushedMessageListSize = 0; + + if ((this.tail = (tail + 1) & (messageLists.length - 1)) == head) { doubleCapacity(); } } @@ -142,7 +152,7 @@ final class ChannelOutboundBuffer { assert head == tail; int p = head; - int n = messages.length; + int n = messageLists.length; int r = n - p; // number of elements to the right of p int newCapacity = n << 1; if (newCapacity < 0) { @@ -151,9 +161,9 @@ final class ChannelOutboundBuffer { @SuppressWarnings("unchecked") MessageList[] a1 = new MessageList[newCapacity]; - System.arraycopy(messages, p, a1, 0, r); - System.arraycopy(messages, 0, a1, r, p); - messages = a1; + System.arraycopy(messageLists, p, a1, 0, r); + System.arraycopy(messageLists, 0, a1, r, p); + messageLists = a1; long[] a2 = new long[newCapacity]; System.arraycopy(messageListSizes, p, a2, 0, r); @@ -171,21 +181,21 @@ final class ChannelOutboundBuffer { int h = head; - MessageList e = messages[h]; // Element is null if deque empty + MessageList e = messageLists[h]; // Element is null if deque empty if (e == null) { currentMessageListSize = 0; - currentMessages = null; + currentMessageList = null; return false; } - currentMessages = messages[h]; + currentMessageList = messageLists[h]; currentMessageIndex = 0; currentMessageListSize = messageListSizes[h]; - messages[h] = null; + messageLists[h] = null; messageListSizes[h] = 0; - head = h + 1 & messages.length - 1; + head = h + 1 & messageLists.length - 1; return true; } @@ -194,25 +204,41 @@ final class ChannelOutboundBuffer { } int size() { - return tail - head & messages.length - 1; + return tail - head & messageLists.length - 1; } boolean isEmpty() { return head == tail; } - void clear() { - int head = this.head; - int tail = this.tail; - if (head != tail) { - this.head = this.tail = 0; - final int mask = messages.length - 1; - int i = head; - do { - messages[i] = null; - messageListSizes[i] = 0; - i = i + 1 & mask; - } while (i != tail); + void clearUnflushed() { + MessageList unflushed = unflushedMessageList; + if (unflushed == null) { + return; + } + + // Release all unflushed messages. + Object[] messages = unflushed.messages(); + ChannelPromise[] promises = unflushed.promises(); + final int size = unflushed.size(); + Throwable flushAborted = null; + try { + for (int i = 0; i < size; i++) { + ReferenceCountUtil.release(messages[i]); + ChannelPromise p = promises[i]; + if (!(p instanceof VoidChannelPromise)) { + if (flushAborted == null) { + flushAborted = new ChannelException("write() aborted without flush()"); + } + if (!p.tryFailure(flushAborted)) { + logger.warn("Promise done already: {} - new exception is:", p, flushAborted); + } + } + } + } finally { + unflushed.recycle(); + decrementPendingOutboundBytes(unflushedMessageListSize); + unflushedMessageListSize = 0; } } @@ -228,25 +254,25 @@ final class ChannelOutboundBuffer { try { inFail = true; - if (currentMessages == null) { + if (currentMessageList == null) { if (!next()) { return; } } do { - if (currentMessages != null) { + if (currentMessageList != null) { // Store a local reference of current messages // This is needed as a promise may have a listener attached that will close the channel // The close will call next() which will set currentMessages to null and so // trigger a NPE in the finally block if no local reference is used. // // See https://github.com/netty/netty/issues/1573 - MessageList current = currentMessages; + MessageList current = currentMessageList; // Release all failed messages. - Object[] messages = currentMessages.messages(); - ChannelPromise[] promises = currentMessages.promises(); - final int size = currentMessages.size(); + Object[] messages = current.messages(); + ChannelPromise[] promises = current.promises(); + final int size = current.size(); try { for (int i = currentMessageIndex; i < size; i++) { ReferenceCountUtil.release(messages[i]);