Separate 'progress total' and 'pending number of bytes'

- Reverted the recent changes in AbstractChannel.calculateMessageSize()
This commit is contained in:
Trustin Lee 2013-07-19 14:09:08 +09:00
parent 4f6ba4fe3d
commit 19c92ceb59
2 changed files with 59 additions and 31 deletions

View File

@ -794,16 +794,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* Calculate the number of bytes a message takes up in memory. Sub-classes may override this if they use different * Calculate the number of bytes a message takes up in memory. Sub-classes may override this if they use different
* messages then {@link ByteBuf} or {@link ByteBufHolder}. If the size can not be calculated 0 should be returned. * messages then {@link ByteBuf} or {@link ByteBufHolder}. If the size can not be calculated 0 should be returned.
*/ */
protected long calculateMessageSize(Object message) { protected int calculateMessageSize(Object message) {
if (message instanceof ByteBuf) { if (message instanceof ByteBuf) {
return ((ByteBuf) message).readableBytes(); return ((ByteBuf) message).readableBytes();
} }
if (message instanceof ByteBufHolder) { if (message instanceof ByteBufHolder) {
return ((ByteBufHolder) message).content().readableBytes(); return ((ByteBufHolder) message).content().readableBytes();
} }
if (message instanceof FileRegion) {
return ((FileRegion) message).count();
}
return 0; return 0;
} }

View File

@ -20,6 +20,7 @@
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle; import io.netty.util.Recycler.Handle;
@ -60,6 +61,7 @@ public final class ChannelOutboundBuffer {
// Flushed messages are stored in a circulas queue. // Flushed messages are stored in a circulas queue.
private Object[] flushed; private Object[] flushed;
private ChannelPromise[] flushedPromises; private ChannelPromise[] flushedPromises;
private int[] flushedPendingSizes;
private long[] flushedProgresses; private long[] flushedProgresses;
private long[] flushedTotals; private long[] flushedTotals;
private int head; private int head;
@ -72,11 +74,12 @@ public final class ChannelOutboundBuffer {
// Unflushed messages are stored in an array list. // Unflushed messages are stored in an array list.
private Object[] unflushed; private Object[] unflushed;
private ChannelPromise[] unflushedPromises; private ChannelPromise[] unflushedPromises;
private int[] unflushedPendingSizes;
private long[] unflushedTotals; private long[] unflushedTotals;
private int unflushedCount; private int unflushedCount;
private boolean inFail; private boolean inFail;
private long pendingOutboundBytes; private long totalPendingSize;
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER = private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "writable"); AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "writable");
@ -113,6 +116,7 @@ public final class ChannelOutboundBuffer {
flushed = new Object[initialCapacity]; flushed = new Object[initialCapacity];
flushedPromises = new ChannelPromise[initialCapacity]; flushedPromises = new ChannelPromise[initialCapacity];
flushedPendingSizes = new int[initialCapacity];
flushedProgresses = new long[initialCapacity]; flushedProgresses = new long[initialCapacity];
flushedTotals = new long[initialCapacity]; flushedTotals = new long[initialCapacity];
@ -120,6 +124,7 @@ public final class ChannelOutboundBuffer {
unflushed = new Object[initialCapacity]; unflushed = new Object[initialCapacity];
unflushedPromises = new ChannelPromise[initialCapacity]; unflushedPromises = new ChannelPromise[initialCapacity];
unflushedPendingSizes = new int[initialCapacity];
unflushedTotals = new long[initialCapacity]; unflushedTotals = new long[initialCapacity];
} }
@ -130,7 +135,7 @@ public final class ChannelOutboundBuffer {
if (unflushedCount != 0) { if (unflushedCount != 0) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
if (pendingOutboundBytes != 0) { if (totalPendingSize != 0) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -145,15 +150,29 @@ public final class ChannelOutboundBuffer {
unflushed = this.unflushed; unflushed = this.unflushed;
} }
final long size = channel.calculateMessageSize(msg); final int size = channel.calculateMessageSize(msg);
incrementPendingOutboundBytes(size); incrementPendingOutboundBytes(size);
unflushed[unflushedCount] = msg; unflushed[unflushedCount] = msg;
unflushedPendingSizes[unflushedCount] = size;
unflushedPromises[unflushedCount] = promise; unflushedPromises[unflushedCount] = promise;
unflushedTotals[unflushedCount] = size; unflushedTotals[unflushedCount] = total(msg);
this.unflushedCount = unflushedCount + 1; this.unflushedCount = unflushedCount + 1;
} }
private static long total(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return -1;
}
private void doubleUnflushedCapacity() { private void doubleUnflushedCapacity() {
int newCapacity = unflushed.length << 1; int newCapacity = unflushed.length << 1;
if (newCapacity < 0) { if (newCapacity < 0) {
@ -170,9 +189,13 @@ public final class ChannelOutboundBuffer {
System.arraycopy(unflushedPromises, 0, a2, 0, unflushedCount); System.arraycopy(unflushedPromises, 0, a2, 0, unflushedCount);
unflushedPromises = a2; unflushedPromises = a2;
long[] a3 = new long[newCapacity]; int[] a3 = new int[newCapacity];
System.arraycopy(unflushedTotals, 0, a3, 0, unflushedCount); System.arraycopy(unflushedPendingSizes, 0, a3, 0, unflushedCount);
unflushedTotals = a3; unflushedPendingSizes = a3;
long[] a4 = new long[newCapacity];
System.arraycopy(unflushedTotals, 0, a4, 0, unflushedCount);
unflushedTotals = a4;
} }
void addFlush() { void addFlush() {
@ -183,10 +206,12 @@ public final class ChannelOutboundBuffer {
Object[] unflushed = this.unflushed; Object[] unflushed = this.unflushed;
ChannelPromise[] unflushedPromises = this.unflushedPromises; ChannelPromise[] unflushedPromises = this.unflushedPromises;
int[] unflushedPendingSizes = this.unflushedPendingSizes;
long[] unflushedTotals = this.unflushedTotals; long[] unflushedTotals = this.unflushedTotals;
Object[] flushed = this.flushed; Object[] flushed = this.flushed;
ChannelPromise[] flushedPromises = this.flushedPromises; ChannelPromise[] flushedPromises = this.flushedPromises;
int[] flushedPendingSizes = this.flushedPendingSizes;
long[] flushedProgresses = this.flushedProgresses; long[] flushedProgresses = this.flushedProgresses;
long[] flushedTotals = this.flushedTotals; long[] flushedTotals = this.flushedTotals;
int head = this.head; int head = this.head;
@ -195,6 +220,7 @@ public final class ChannelOutboundBuffer {
for (int i = 0; i < unflushedCount; i ++) { for (int i = 0; i < unflushedCount; i ++) {
flushed[tail] = unflushed[i]; flushed[tail] = unflushed[i];
flushedPromises[tail] = unflushedPromises[i]; flushedPromises[tail] = unflushedPromises[i];
flushedPendingSizes[tail] = unflushedPendingSizes[i];
flushedProgresses[tail] = 0; flushedProgresses[tail] = 0;
flushedTotals[tail] = unflushedTotals[i]; flushedTotals[tail] = unflushedTotals[i];
if ((tail = (tail + 1) & (flushed.length - 1)) == head) { if ((tail = (tail + 1) & (flushed.length - 1)) == head) {
@ -204,6 +230,7 @@ public final class ChannelOutboundBuffer {
tail = this.tail; tail = this.tail;
flushed = this.flushed; flushed = this.flushed;
flushedPromises = this.flushedPromises; flushedPromises = this.flushedPromises;
flushedPendingSizes = this.flushedPendingSizes;
flushedProgresses = this.flushedProgresses; flushedProgresses = this.flushedProgresses;
flushedTotals = this.flushedTotals; flushedTotals = this.flushedTotals;
} }
@ -235,26 +262,31 @@ public final class ChannelOutboundBuffer {
System.arraycopy(flushedPromises, 0, a2, r, p); System.arraycopy(flushedPromises, 0, a2, r, p);
flushedPromises = a2; flushedPromises = a2;
long[] a3 = new long[newCapacity]; int[] a3 = new int[newCapacity];
System.arraycopy(flushedProgresses, p, a3, 0, r); System.arraycopy(flushedPendingSizes, p, a3, 0, r);
System.arraycopy(flushedProgresses, 0, a3, r, p); System.arraycopy(flushedPendingSizes, 0, a3, r, p);
flushedProgresses = a3; flushedPendingSizes = a3;
long[] a4 = new long[newCapacity]; long[] a4 = new long[newCapacity];
System.arraycopy(flushedTotals, p, a4, 0, r); System.arraycopy(flushedProgresses, p, a4, 0, r);
System.arraycopy(flushedTotals, 0, a4, r, p); System.arraycopy(flushedProgresses, 0, a4, r, p);
flushedTotals = a4; flushedProgresses = a4;
long[] a5 = new long[newCapacity];
System.arraycopy(flushedTotals, p, a5, 0, r);
System.arraycopy(flushedTotals, 0, a5, r, p);
flushedTotals = a5;
head = 0; head = 0;
tail = n; tail = n;
} }
private void incrementPendingOutboundBytes(long size) { private void incrementPendingOutboundBytes(int size) {
if (size == 0) { if (size == 0) {
return; return;
} }
long newWriteBufferSize = pendingOutboundBytes += size; long newWriteBufferSize = totalPendingSize += size;
int highWaterMark = channel.config().getWriteBufferHighWaterMark(); int highWaterMark = channel.config().getWriteBufferHighWaterMark();
if (newWriteBufferSize > highWaterMark) { if (newWriteBufferSize > highWaterMark) {
@ -264,16 +296,15 @@ public final class ChannelOutboundBuffer {
} }
} }
private void decrementPendingOutboundBytes(long size) { private void decrementPendingOutboundBytes(int size) {
if (size == 0) { if (size == 0) {
return; return;
} }
long newWriteBufferSize = pendingOutboundBytes -= size; long newWriteBufferSize = totalPendingSize -= size;
int lowWaterMark = channel.config().getWriteBufferLowWaterMark(); int lowWaterMark = channel.config().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) { if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
channel.pipeline().fireChannelWritabilityChanged(); channel.pipeline().fireChannelWritabilityChanged();
} }
@ -309,8 +340,8 @@ public final class ChannelOutboundBuffer {
promise.trySuccess(); promise.trySuccess();
flushedPromises[head] = null; flushedPromises[head] = null;
decrementPendingOutboundBytes(flushedTotals[head]); decrementPendingOutboundBytes(flushedPendingSizes[head]);
flushedTotals[head] = 0; flushedPendingSizes[head] = 0;
this.head = head + 1 & flushed.length - 1; this.head = head + 1 & flushed.length - 1;
return true; return true;
@ -330,8 +361,8 @@ public final class ChannelOutboundBuffer {
safeFail(flushedPromises[head], cause); safeFail(flushedPromises[head], cause);
flushedPromises[head] = null; flushedPromises[head] = null;
decrementPendingOutboundBytes(flushedTotals[head]); decrementPendingOutboundBytes(flushedPendingSizes[head]);
flushedTotals[head] = 0; flushedPendingSizes[head] = 0;
this.head = head + 1 & flushed.length - 1; this.head = head + 1 & flushed.length - 1;
return true; return true;
@ -433,7 +464,7 @@ public final class ChannelOutboundBuffer {
} }
boolean getWritable() { boolean getWritable() {
return WRITABLE_UPDATER.get(this) == 1; return WRITABLE_UPDATER.get(this) != 0;
} }
public int size() { public int size() {
@ -454,7 +485,7 @@ public final class ChannelOutboundBuffer {
// Release all unflushed messages. // Release all unflushed messages.
Object[] unflushed = this.unflushed; Object[] unflushed = this.unflushed;
ChannelPromise[] unflushedPromises = this.unflushedPromises; ChannelPromise[] unflushedPromises = this.unflushedPromises;
long[] unflushedTotals = this.unflushedTotals; int[] unflushedPendingSizes = this.unflushedPendingSizes;
final int unflushedCount = this.unflushedCount; final int unflushedCount = this.unflushedCount;
try { try {
for (int i = 0; i < unflushedCount; i++) { for (int i = 0; i < unflushedCount; i++) {
@ -462,8 +493,8 @@ public final class ChannelOutboundBuffer {
unflushed[i] = null; unflushed[i] = null;
safeFail(unflushedPromises[i], cause); safeFail(unflushedPromises[i], cause);
unflushedPromises[i] = null; unflushedPromises[i] = null;
decrementPendingOutboundBytes(unflushedTotals[i]); decrementPendingOutboundBytes(unflushedPendingSizes[i]);
unflushedTotals[i] = 0; unflushedPendingSizes[i] = 0;
} }
} finally { } finally {
this.unflushedCount = 0; this.unflushedCount = 0;