Fix data structure corruption and resource leak in ChannelOutboundBuffer

This commit is contained in:
Trustin Lee 2013-07-17 21:02:20 +09:00
parent 66c4c07ec0
commit a8d67b0282
2 changed files with 76 additions and 49 deletions

View File

@ -498,15 +498,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
promise.setFailure(t); promise.setFailure(t);
} }
if (closedChannelException == null) { if (!outboundBuffer.isEmpty()) {
closedChannelException = new ClosedChannelException(); // fail all queued messages
} if (closedChannelException == null) {
closedChannelException = new ClosedChannelException();
// fail all queued messages }
if (outboundBuffer.next()) {
outboundBuffer.fail(closedChannelException); outboundBuffer.fail(closedChannelException);
} }
outboundBuffer.clearUnflushed();
if (wasActive && !isActive()) { if (wasActive && !isActive()) {
invokeLater(new Runnable() { invokeLater(new Runnable() {
@Override @Override
@ -620,12 +621,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try { try {
for (;;) { for (;;) {
MessageList messages = outboundBuffer.currentMessages; MessageList messages = outboundBuffer.currentMessageList;
if (messages == null) { if (messages == null) {
if (!outboundBuffer.next()) { if (!outboundBuffer.next()) {
break; break;
} }
messages = outboundBuffer.currentMessages; messages = outboundBuffer.currentMessageList;
} }
int messageIndex = outboundBuffer.currentMessageIndex; int messageIndex = outboundBuffer.currentMessageIndex;

View File

@ -31,13 +31,16 @@ final class ChannelOutboundBuffer {
private static final int MIN_INITIAL_CAPACITY = 8; private static final int MIN_INITIAL_CAPACITY = 8;
MessageList currentMessages; MessageList currentMessageList;
int currentMessageIndex; int currentMessageIndex;
private long currentMessageListSize; private long currentMessageListSize;
private MessageList[] messages; private MessageList[] messageLists;
private long[] messageListSizes; private long[] messageListSizes;
private MessageList unflushedMessageList;
private long unflushedMessageListSize;
private int head; private int head;
private int tail; private int tail;
private boolean inFail; private boolean inFail;
@ -77,32 +80,39 @@ final class ChannelOutboundBuffer {
initialCapacity = MIN_INITIAL_CAPACITY; initialCapacity = MIN_INITIAL_CAPACITY;
} }
messages = new MessageList[initialCapacity];
messageListSizes = new long[initialCapacity];
this.channel = channel; this.channel = channel;
messageLists = new MessageList[initialCapacity];
messageListSizes = new long[initialCapacity];
} }
void addMessage(Object msg, ChannelPromise promise) { void addMessage(Object msg, ChannelPromise promise) {
int tail = this.tail; MessageList unflushedMessageList = this.unflushedMessageList;
MessageList msgs = messages[tail]; if (unflushedMessageList == null) {
if (msgs == null) { this.unflushedMessageList = unflushedMessageList = MessageList.newInstance();
messages[tail] = msgs = MessageList.newInstance();
} }
msgs.add(msg, promise); unflushedMessageList.add(msg, promise);
int size = channel.calculateMessageSize(msg); int size = channel.calculateMessageSize(msg);
messageListSizes[tail] += size; unflushedMessageListSize += size;
incrementPendingOutboundBytes(size); incrementPendingOutboundBytes(size);
} }
void addFlush() { void addFlush() {
int tail = this.tail; MessageList unflushedMessageList = this.unflushedMessageList;
if (messages[tail] == null) { if (unflushedMessageList == null) {
return; return;
} }
if ((this.tail = tail + 1 & messages.length - 1) == head) { int tail = this.tail;
messageLists[tail] = unflushedMessageList;
messageListSizes[tail] = unflushedMessageListSize;
this.unflushedMessageList = null;
unflushedMessageListSize = 0;
if ((this.tail = (tail + 1) & (messageLists.length - 1)) == head) {
doubleCapacity(); doubleCapacity();
} }
} }
@ -142,7 +152,7 @@ final class ChannelOutboundBuffer {
assert head == tail; assert head == tail;
int p = head; int p = head;
int n = messages.length; int n = messageLists.length;
int r = n - p; // number of elements to the right of p int r = n - p; // number of elements to the right of p
int newCapacity = n << 1; int newCapacity = n << 1;
if (newCapacity < 0) { if (newCapacity < 0) {
@ -151,9 +161,9 @@ final class ChannelOutboundBuffer {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
MessageList[] a1 = new MessageList[newCapacity]; MessageList[] a1 = new MessageList[newCapacity];
System.arraycopy(messages, p, a1, 0, r); System.arraycopy(messageLists, p, a1, 0, r);
System.arraycopy(messages, 0, a1, r, p); System.arraycopy(messageLists, 0, a1, r, p);
messages = a1; messageLists = a1;
long[] a2 = new long[newCapacity]; long[] a2 = new long[newCapacity];
System.arraycopy(messageListSizes, p, a2, 0, r); System.arraycopy(messageListSizes, p, a2, 0, r);
@ -171,21 +181,21 @@ final class ChannelOutboundBuffer {
int h = head; int h = head;
MessageList e = messages[h]; // Element is null if deque empty MessageList e = messageLists[h]; // Element is null if deque empty
if (e == null) { if (e == null) {
currentMessageListSize = 0; currentMessageListSize = 0;
currentMessages = null; currentMessageList = null;
return false; return false;
} }
currentMessages = messages[h]; currentMessageList = messageLists[h];
currentMessageIndex = 0; currentMessageIndex = 0;
currentMessageListSize = messageListSizes[h]; currentMessageListSize = messageListSizes[h];
messages[h] = null; messageLists[h] = null;
messageListSizes[h] = 0; messageListSizes[h] = 0;
head = h + 1 & messages.length - 1; head = h + 1 & messageLists.length - 1;
return true; return true;
} }
@ -194,25 +204,41 @@ final class ChannelOutboundBuffer {
} }
int size() { int size() {
return tail - head & messages.length - 1; return tail - head & messageLists.length - 1;
} }
boolean isEmpty() { boolean isEmpty() {
return head == tail; return head == tail;
} }
void clear() { void clearUnflushed() {
int head = this.head; MessageList unflushed = unflushedMessageList;
int tail = this.tail; if (unflushed == null) {
if (head != tail) { return;
this.head = this.tail = 0; }
final int mask = messages.length - 1;
int i = head; // Release all unflushed messages.
do { Object[] messages = unflushed.messages();
messages[i] = null; ChannelPromise[] promises = unflushed.promises();
messageListSizes[i] = 0; final int size = unflushed.size();
i = i + 1 & mask; Throwable flushAborted = null;
} while (i != tail); try {
for (int i = 0; i < size; i++) {
ReferenceCountUtil.release(messages[i]);
ChannelPromise p = promises[i];
if (!(p instanceof VoidChannelPromise)) {
if (flushAborted == null) {
flushAborted = new ChannelException("write() aborted without flush()");
}
if (!p.tryFailure(flushAborted)) {
logger.warn("Promise done already: {} - new exception is:", p, flushAborted);
}
}
}
} finally {
unflushed.recycle();
decrementPendingOutboundBytes(unflushedMessageListSize);
unflushedMessageListSize = 0;
} }
} }
@ -228,25 +254,25 @@ final class ChannelOutboundBuffer {
try { try {
inFail = true; inFail = true;
if (currentMessages == null) { if (currentMessageList == null) {
if (!next()) { if (!next()) {
return; return;
} }
} }
do { do {
if (currentMessages != null) { if (currentMessageList != null) {
// Store a local reference of current messages // Store a local reference of current messages
// This is needed as a promise may have a listener attached that will close the channel // This is needed as a promise may have a listener attached that will close the channel
// The close will call next() which will set currentMessages to null and so // The close will call next() which will set currentMessages to null and so
// trigger a NPE in the finally block if no local reference is used. // trigger a NPE in the finally block if no local reference is used.
// //
// See https://github.com/netty/netty/issues/1573 // See https://github.com/netty/netty/issues/1573
MessageList current = currentMessages; MessageList current = currentMessageList;
// Release all failed messages. // Release all failed messages.
Object[] messages = currentMessages.messages(); Object[] messages = current.messages();
ChannelPromise[] promises = currentMessages.promises(); ChannelPromise[] promises = current.promises();
final int size = currentMessages.size(); final int size = current.size();
try { try {
for (int i = currentMessageIndex; i < size; i++) { for (int i = currentMessageIndex; i < size; i++) {
ReferenceCountUtil.release(messages[i]); ReferenceCountUtil.release(messages[i]);