[#2693] Reduce memory usage of ChannelOutboundBuffer

Motiviation:

ChannelOuboundBuffer uses often too much memory. This is especially a problem if you want to serve a lot of connections. This is due the fact that it uses 2 arrays internally. One if used as a circular buffer and store the Entries that are never released  (ChannelOutboundBuffer is pooled) and one is used to hold the ByteBuffers that are used for gathering writes.

Modifications:

Rewrite ChannelOutboundBuffer to remove these two arrays by:
  - Make Entry recyclable and use it as linked Node
  - Remove the circular buffer which was used for the Entries as we use a Linked-List like structure now
  - Remove the array that did hold the ByteBuffers and replace it by an ByteBuffer array that is hold by a FastThreadLocal. We use a fixed capacity of 1024 here which is fine as we share these anyway.
  - ChannelOuboundBuffer is not recyclable anymore as it is now a "light-weight" object. We recycle the internally used Entries instead.

Result:

Less memory footprint and resource usage. Performance seems to be a bit better but most likely as we not need to expand any arrays anymore.

Benchmark before change:
[nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    26.88ms   67.47ms   1.26s    97.97%
    Req/Sec   191.81k    28.22k  255.63k    83.86%
  364806639 requests in 2.00m, 48.92GB read
Requests/sec: 3040101.23
Transfer/sec:    417.49MB

Benchmark after change:

[nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256  http://xxx:8080/plaintext
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    22.22ms   17.22ms 301.77ms   90.13%
    Req/Sec   194.98k    41.98k  328.38k    70.50%
  371816023 requests in 2.00m, 49.86GB read
Requests/sec: 3098461.44
Transfer/sec:    425.51MB
This commit is contained in:
Norman Maurer 2014-07-24 10:39:27 +02:00
parent c90de50ea7
commit 73dfd7c01b
3 changed files with 160 additions and 248 deletions

View File

@ -374,7 +374,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/ */
protected abstract class AbstractUnsafe implements Unsafe { protected abstract class AbstractUnsafe implements Unsafe {
private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this); private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private boolean inFlush0; private boolean inFlush0;
@Override @Override

View File

@ -29,6 +29,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle; import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -36,7 +37,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -48,47 +48,35 @@ public final class ChannelOutboundBuffer {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
private static final int INITIAL_CAPACITY =
SystemPropertyUtil.getInt("io.netty.outboundBufferInitialCapacity", 4);
private static final int threadLocalDirectBufferSize = private static final int threadLocalDirectBufferSize =
SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
static { static {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.outboundBufferInitialCapacity: {}", INITIAL_CAPACITY);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize); logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
} }
} }
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() { private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
@Override @Override
protected ChannelOutboundBuffer newObject(Handle handle) { protected ByteBuffer[] initialValue() throws Exception {
return new ChannelOutboundBuffer(handle); return new ByteBuffer[1024];
} }
}; };
static ChannelOutboundBuffer newInstance(AbstractChannel channel) { private final AbstractChannel channel;
ChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
buffer.totalPendingSize = 0;
buffer.writable = 1;
return buffer;
}
private final Handle handle; // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
private AbstractChannel channel; // The Entry that is the first in the linked-list structure that was flushed
private Entry flushedEntry;
// A circular buffer used to store messages. The buffer is arranged such that: flushed <= unflushed <= tail. The // The Entry which is the first unflushed in the linked-list structure
// flushed messages are stored in the range [flushed, unflushed). Unflushed messages are stored in the range private Entry unflushedEntry;
// [unflushed, tail). // The Entry which represents the tail of the buffer
private Entry[] buffer; private Entry tailEntry;
// The number of flushed entries that are not written yet
private int flushed; private int flushed;
private int unflushed;
private int tail;
private ByteBuffer[] nioBuffers;
private int nioBufferCount; private int nioBufferCount;
private long nioBufferSize; private long nioBufferSize;
@ -96,10 +84,14 @@ public final class ChannelOutboundBuffer {
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER; private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER;
@SuppressWarnings("unused")
private volatile long totalPendingSize; private volatile long totalPendingSize;
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER; private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER;
@SuppressWarnings("FieldMayBeFinal")
private volatile int writable = 1;
static { static {
AtomicIntegerFieldUpdater<ChannelOutboundBuffer> writableUpdater = AtomicIntegerFieldUpdater<ChannelOutboundBuffer> writableUpdater =
PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "writable"); PlatformDependent.newAtomicIntegerFieldUpdater(ChannelOutboundBuffer.class, "writable");
@ -116,17 +108,8 @@ public final class ChannelOutboundBuffer {
TOTAL_PENDING_SIZE_UPDATER = pendingSizeUpdater; TOTAL_PENDING_SIZE_UPDATER = pendingSizeUpdater;
} }
private volatile int writable = 1; ChannelOutboundBuffer(AbstractChannel channel) {
this.channel = channel;
private ChannelOutboundBuffer(Handle handle) {
this.handle = handle;
buffer = new Entry[INITIAL_CAPACITY];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = new Entry();
}
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
} }
void addMessage(Object msg, ChannelPromise promise) { void addMessage(Object msg, ChannelPromise promise) {
@ -135,16 +118,17 @@ public final class ChannelOutboundBuffer {
size = 0; size = 0;
} }
Entry e = buffer[tail++]; Entry entry = Entry.newInstance(msg, size, total(msg), promise);
e.msg = msg; if (tailEntry == null) {
e.pendingSize = size; flushedEntry = null;
e.promise = promise; tailEntry = entry;
e.total = total(msg); } else {
Entry tail = tailEntry;
tail &= buffer.length - 1; tail.next = entry;
tailEntry = entry;
if (tail == flushed) { }
addCapacity(); if (unflushedEntry == null) {
unflushedEntry = entry;
} }
// increment pending bytes after adding message to the unflushed arrays. // increment pending bytes after adding message to the unflushed arrays.
@ -152,49 +136,29 @@ public final class ChannelOutboundBuffer {
incrementPendingOutboundBytes(size); incrementPendingOutboundBytes(size);
} }
private void addCapacity() {
int p = flushed;
int n = buffer.length;
int r = n - p; // number of elements to the right of p
int s = size();
int newCapacity = n << 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
Entry[] e = new Entry[newCapacity];
System.arraycopy(buffer, p, e, 0, r);
System.arraycopy(buffer, 0, e, r, p);
for (int i = n; i < e.length; i++) {
e[i] = new Entry();
}
buffer = e;
flushed = 0;
unflushed = s;
tail = n;
}
void addFlush() { void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages // There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime. // where added in the meantime.
// //
// See https://github.com/netty/netty/issues/2577 // See https://github.com/netty/netty/issues/2577
if (unflushed != tail) { Entry entry = unflushedEntry;
unflushed = tail; if (entry != null) {
if (flushedEntry == null) {
final int mask = buffer.length - 1; // there is no flushedEntry yet, so start with the entry
int i = flushed; flushedEntry = entry;
while (i != unflushed && buffer[i].msg != null) { }
Entry entry = buffer[i]; do {
flushed ++;
if (!entry.promise.setUncancellable()) { if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes // Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel(); int pending = entry.cancel();
decrementPendingOutboundBytes(pending); decrementPendingOutboundBytes(pending);
} }
i = i + 1 & mask; entry = entry.next;
} } while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
} }
} }
@ -203,10 +167,7 @@ public final class ChannelOutboundBuffer {
* This method is thread-safe! * This method is thread-safe!
*/ */
void incrementPendingOutboundBytes(int size) { void incrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets if (size == 0) {
// recycled while process this method.
Channel channel = this.channel;
if (size == 0 || channel == null) {
return; return;
} }
@ -223,10 +184,7 @@ public final class ChannelOutboundBuffer {
* This method is thread-safe! * This method is thread-safe!
*/ */
void decrementPendingOutboundBytes(int size) { void decrementPendingOutboundBytes(int size) {
// Cache the channel and check for null to make sure we not produce a NPE in case of the Channel gets if (size == 0) {
// recycled while process this method.
Channel channel = this.channel;
if (size == 0 || channel == null) {
return; return;
} }
@ -256,42 +214,41 @@ public final class ChannelOutboundBuffer {
} }
public Object current(boolean preferDirect) { public Object current(boolean preferDirect) {
if (isEmpty()) { // TODO: Think of a smart way to handle ByteBufHolder messages
Entry entry = flushedEntry;
if (entry == null) {
return null; return null;
} else { }
// TODO: Think of a smart way to handle ByteBufHolder messages Object msg = entry.msg;
Entry entry = buffer[flushed]; if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
Object msg = entry.msg;
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
return msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return buf;
} else {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
return buf;
}
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we use a ThreadLocal based pool.
ByteBufAllocator alloc = channel.alloc();
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
current(directBuf);
return directBuf;
}
}
return msg; return msg;
} }
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return buf;
} else {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
return buf;
}
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we use a ThreadLocal based pool.
ByteBufAllocator alloc = channel.alloc();
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
current(directBuf);
return directBuf;
}
}
return msg;
} }
/** /**
@ -299,13 +256,15 @@ public final class ChannelOutboundBuffer {
* The replaced msg will automatically be released * The replaced msg will automatically be released
*/ */
public void current(Object msg) { public void current(Object msg) {
Entry entry = buffer[flushed]; Entry entry = flushedEntry;
assert entry != null;
safeRelease(entry.msg); safeRelease(entry.msg);
entry.msg = msg; entry.msg = msg;
} }
public void progress(long amount) { public void progress(long amount) {
Entry e = buffer[flushed]; Entry e = flushedEntry;
assert e != null;
ChannelPromise p = e.promise; ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) { if (p instanceof ChannelProgressivePromise) {
long progress = e.progress + amount; long progress = e.progress + amount;
@ -315,22 +274,16 @@ public final class ChannelOutboundBuffer {
} }
public boolean remove() { public boolean remove() {
if (isEmpty()) { Entry e = flushedEntry;
if (e == null) {
return false; return false;
} }
Entry e = buffer[flushed];
Object msg = e.msg; Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise; ChannelPromise promise = e.promise;
int size = e.pendingSize; int size = e.pendingSize;
e.clear(); removeEntry(e);
flushed = flushed + 1 & buffer.length - 1;
if (!e.cancelled) { if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before. // only release message, notify and decrement if it was not canceled before.
@ -339,26 +292,23 @@ public final class ChannelOutboundBuffer {
decrementPendingOutboundBytes(size); decrementPendingOutboundBytes(size);
} }
// recycle the entry
e.recycleAndGetNext();
return true; return true;
} }
public boolean remove(Throwable cause) { public boolean remove(Throwable cause) {
if (isEmpty()) { Entry e = flushedEntry;
if (e == null) {
return false; return false;
} }
Entry e = buffer[flushed];
Object msg = e.msg; Object msg = e.msg;
if (msg == null) {
return false;
}
ChannelPromise promise = e.promise; ChannelPromise promise = e.promise;
int size = e.pendingSize; int size = e.pendingSize;
e.clear(); removeEntry(e);
flushed = flushed + 1 & buffer.length - 1;
if (!e.cancelled) { if (!e.cancelled) {
// only release message, fail and decrement if it was not canceled before. // only release message, fail and decrement if it was not canceled before.
@ -368,9 +318,25 @@ public final class ChannelOutboundBuffer {
decrementPendingOutboundBytes(size); decrementPendingOutboundBytes(size);
} }
// recycle the entry
e.recycleAndGetNext();
return true; return true;
} }
private void removeEntry(Entry e) {
if (e == tailEntry) {
// processed everything
tailEntry = null;
unflushedEntry = null;
}
if (-- flushed == 0) {
flushedEntry = null;
} else {
flushedEntry = e.next;
}
}
/** /**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only. * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and * {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and
@ -385,22 +351,12 @@ public final class ChannelOutboundBuffer {
public ByteBuffer[] nioBuffers() { public ByteBuffer[] nioBuffers() {
long nioBufferSize = 0; long nioBufferSize = 0;
int nioBufferCount = 0; int nioBufferCount = 0;
final int mask = buffer.length - 1;
final ByteBufAllocator alloc = channel.alloc(); final ByteBufAllocator alloc = channel.alloc();
ByteBuffer[] nioBuffers = this.nioBuffers; ByteBuffer[] nioBuffers = NIO_BUFFERS.get();
Object m; Entry entry = flushedEntry;
int i = flushed; while (entry != null && entry.msg instanceof ByteBuf) {
while (i != unflushed && (m = buffer[i].msg) != null) {
if (!(m instanceof ByteBuf)) {
// Just break out of the loop as we can still use gathering writes for the buffers that we
// found by now.
break;
}
Entry entry = buffer[i];
if (!entry.cancelled) { if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) m; ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex(); final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex; final int readableBytes = buf.writerIndex() - readerIndex;
@ -413,8 +369,7 @@ public final class ChannelOutboundBuffer {
} }
int neededSpace = nioBufferCount + count; int neededSpace = nioBufferCount + count;
if (neededSpace > nioBuffers.length) { if (neededSpace > nioBuffers.length) {
this.nioBuffers = nioBuffers = break;
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
} }
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (count == 1) { if (count == 1) {
@ -440,8 +395,7 @@ public final class ChannelOutboundBuffer {
} }
} }
} }
entry = entry.next;
i = i + 1 & mask;
} }
this.nioBufferCount = nioBufferCount; this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize; this.nioBufferSize = nioBufferSize;
@ -477,25 +431,6 @@ public final class ChannelOutboundBuffer {
return nioBufferCount; return nioBufferCount;
} }
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (neededSpace > newCapacity);
ByteBuffer[] newArray = new ByteBuffer[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);
return newArray;
}
public int nioBufferCount() { public int nioBufferCount() {
return nioBufferCount; return nioBufferCount;
} }
@ -509,11 +444,11 @@ public final class ChannelOutboundBuffer {
} }
public int size() { public int size() {
return unflushed - flushed & buffer.length - 1; return flushed;
} }
public boolean isEmpty() { public boolean isEmpty() {
return unflushed == flushed; return flushed == 0;
} }
void failFlushed(Throwable cause) { void failFlushed(Throwable cause) {
@ -560,29 +495,22 @@ public final class ChannelOutboundBuffer {
} }
// Release all unflushed messages. // Release all unflushed messages.
final int unflushedCount = tail - unflushed & buffer.length - 1;
try { try {
for (int i = 0; i < unflushedCount; i++) { Entry e = unflushedEntry;
Entry e = buffer[unflushed + i & buffer.length - 1]; while (e != null) {
// Just decrease; do not trigger any events via decrementPendingOutboundBytes() // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
int size = e.pendingSize; int size = e.pendingSize;
TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
e.pendingSize = 0;
if (!e.cancelled) { if (!e.cancelled) {
safeRelease(e.msg); safeRelease(e.msg);
safeFail(e.promise, cause); safeFail(e.promise, cause);
} }
e.msg = null; e = e.recycleAndGetNext();
e.promise = null;
} }
} finally { } finally {
tail = unflushed;
inFail = false; inFail = false;
} }
recycle();
} }
private static void safeRelease(Object message) { private static void safeRelease(Object message) {
@ -605,38 +533,25 @@ public final class ChannelOutboundBuffer {
} }
} }
@Deprecated
public void recycle() { public void recycle() {
if (buffer.length > INITIAL_CAPACITY) { // NOOP
Entry[] e = new Entry[INITIAL_CAPACITY];
System.arraycopy(buffer, 0, e, 0, INITIAL_CAPACITY);
buffer = e;
}
if (nioBuffers.length > INITIAL_CAPACITY) {
nioBuffers = new ByteBuffer[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(nioBuffers, null);
}
// reset flushed, unflushed and tail
// See https://github.com/netty/netty/issues/1772
flushed = 0;
unflushed = 0;
tail = 0;
// Set the channel to null so it can be GC'ed ASAP
channel = null;
RECYCLER.recycle(this, handle);
} }
public long totalPendingWriteBytes() { public long totalPendingWriteBytes() {
return totalPendingSize; return totalPendingSize;
} }
private static final class Entry { static final class Entry {
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle handle) {
return new Entry(handle);
}
};
private final Handle handle;
Entry next;
Object msg; Object msg;
ByteBuffer[] buffers; ByteBuffer[] buffers;
ByteBuffer buf; ByteBuffer buf;
@ -647,7 +562,20 @@ public final class ChannelOutboundBuffer {
int count = -1; int count = -1;
boolean cancelled; boolean cancelled;
public int cancel() { private Entry(Handle handle) {
this.handle = handle;
}
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size;
entry.total = total;
entry.promise = promise;
return entry;
}
int cancel() {
if (!cancelled) { if (!cancelled) {
cancelled = true; cancelled = true;
int pSize = pendingSize; int pSize = pendingSize;
@ -666,7 +594,9 @@ public final class ChannelOutboundBuffer {
return 0; return 0;
} }
public void clear() { Entry recycleAndGetNext() {
Entry e = next;
next = null;
buffers = null; buffers = null;
buf = null; buf = null;
msg = null; msg = null;
@ -676,6 +606,8 @@ public final class ChannelOutboundBuffer {
pendingSize = 0; pendingSize = 0;
count = -1; count = -1;
cancelled = false; cancelled = false;
RECYCLER.recycle(this, handle);
return e;
} }
} }

View File

@ -31,7 +31,7 @@ public class ChannelOutboundBufferTest {
@Test @Test
public void testEmptyNioBuffers() { public void testEmptyNioBuffers() {
TestChannel channel = new TestChannel(); TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
assertEquals(0, buffer.nioBufferCount()); assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers); assertNotNull(buffers);
@ -46,28 +46,18 @@ public class ChannelOutboundBufferTest {
public void testNioBuffersSingleBacked() { public void testNioBuffersSingleBacked() {
TestChannel channel = new TestChannel(); TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
assertEquals(0, buffer.nioBufferCount());
ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers);
for (ByteBuffer b: buffers) {
assertNull(b);
}
assertEquals(0, buffer.nioBufferCount()); assertEquals(0, buffer.nioBufferCount());
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII); ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
ByteBuffer nioBuf = buf.internalNioBuffer(0, buf.readableBytes()); ByteBuffer nioBuf = buf.internalNioBuffer(0, buf.readableBytes());
buffer.addMessage(buf, channel.voidPromise()); buffer.addMessage(buf, channel.voidPromise());
buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush(); buffer.addFlush();
buffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertNotNull(buffers); assertNotNull(buffers);
assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 1, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) { for (int i = 0; i < buffer.nioBufferCount(); i++) {
if (i == 0) { if (i == 0) {
assertEquals(buffers[i], nioBuf); assertEquals(buffers[i], nioBuf);
} else { } else {
@ -81,22 +71,17 @@ public class ChannelOutboundBufferTest {
public void testNioBuffersExpand() { public void testNioBuffersExpand() {
TestChannel channel = new TestChannel(); TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 64; i++) { for (int i = 0; i < 64; i++) {
buffer.addMessage(buf.copy(), channel.voidPromise()); buffer.addMessage(buf.copy(), channel.voidPromise());
} }
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush(); buffer.addFlush();
buffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(64, buffers.length);
assertEquals(64, buffer.nioBufferCount()); assertEquals(64, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) { for (int i = 0; i < buffer.nioBufferCount(); i++) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
} }
release(buffer); release(buffer);
@ -107,7 +92,7 @@ public class ChannelOutboundBufferTest {
public void testNioBuffersExpand2() { public void testNioBuffersExpand2() {
TestChannel channel = new TestChannel(); TestChannel channel = new TestChannel();
ChannelOutboundBuffer buffer = ChannelOutboundBuffer.newInstance(channel); ChannelOutboundBuffer buffer = new ChannelOutboundBuffer(channel);
CompositeByteBuf comp = compositeBuffer(256); CompositeByteBuf comp = compositeBuffer(256);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
@ -116,16 +101,11 @@ public class ChannelOutboundBufferTest {
} }
buffer.addMessage(comp, channel.voidPromise()); buffer.addMessage(comp, channel.voidPromise());
ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount()); assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
for (ByteBuffer b: buffers) {
assertNull(b);
}
buffer.addFlush(); buffer.addFlush();
buffers = buffer.nioBuffers(); ByteBuffer[] buffers = buffer.nioBuffers();
assertEquals(128, buffers.length);
assertEquals(65, buffer.nioBufferCount()); assertEquals(65, buffer.nioBufferCount());
for (int i = 0; i < buffers.length; i++) { for (int i = 0; i < buffer.nioBufferCount(); i++) {
if (i < 65) { if (i < 65) {
assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes())); assertEquals(buffers[i], buf.internalNioBuffer(0, buf.readableBytes()));
} else { } else {