Use a Thread-local based direct buffer pool if non pooled allocator is used

This commit is contained in:
Norman Maurer 2013-09-03 11:19:57 +02:00
parent 95576d6559
commit 6716dca17a
6 changed files with 89 additions and 18 deletions

View File

@ -22,10 +22,13 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder; import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle; import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -45,6 +48,13 @@ public final class ChannelOutboundBuffer {
private static final int INITIAL_CAPACITY = 32; private static final int INITIAL_CAPACITY = 32;
private static final int threadLocalDirectBufferSize;
static {
threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
}
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() { private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
@Override @Override
protected ChannelOutboundBuffer newObject(Handle handle) { protected ChannelOutboundBuffer newObject(Handle handle) {
@ -224,10 +234,44 @@ public final class ChannelOutboundBuffer {
} }
public Object current() { public Object current() {
return current(true);
}
public Object current(boolean preferDirect) {
if (isEmpty()) { if (isEmpty()) {
return null; return null;
} else { } else {
return buffer[flushed].msg; // TODO: Think of a smart way to handle ByteBufHolder messages
Object msg = buffer[flushed].msg;
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
return msg;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return buf;
} else {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
return buf;
}
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we use a ThreadLocal based pool.
ByteBufAllocator alloc = channel.alloc();
ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
current(directBuf);
return directBuf;
}
}
return msg;
} }
} }
@ -339,7 +383,7 @@ public final class ChannelOutboundBuffer {
if (nioBufferCount + count > nioBuffers.length) { if (nioBufferCount + count > nioBuffers.length) {
this.nioBuffers = nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount); this.nioBuffers = nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCount);
} }
if (buf.isDirect() || !alloc.isDirectBufferPooled()) { if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
if (buf.nioBufferCount() == 1) { if (buf.nioBufferCount() == 1) {
nioBuffers[nioBufferCount ++] = buf.internalNioBuffer(readerIndex, readableBytes); nioBuffers[nioBufferCount ++] = buf.internalNioBuffer(readerIndex, readableBytes);
} else { } else {
@ -371,7 +415,12 @@ public final class ChannelOutboundBuffer {
private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes, private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes,
ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) { ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) {
ByteBuf directBuf = alloc.directBuffer(readableBytes); ByteBuf directBuf;
if (alloc.isDirectBufferPooled()) {
directBuf = alloc.directBuffer(readableBytes);
} else {
directBuf = ThreadLocalPooledByteBuf.newInstance();
}
directBuf.writeBytes(buf, readerIndex, readableBytes); directBuf.writeBytes(buf, readerIndex, readableBytes);
buf.release(); buf.release();
entry.msg = directBuf; entry.msg = directBuf;
@ -543,4 +592,35 @@ public final class ChannelOutboundBuffer {
} }
} }
static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf {
private final Recycler.Handle handle;
private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER = new Recycler<ThreadLocalPooledByteBuf>() {
@Override
protected ThreadLocalPooledByteBuf newObject(Handle handle) {
return new ThreadLocalPooledByteBuf(handle);
}
};
private ThreadLocalPooledByteBuf(Recycler.Handle handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
static ThreadLocalPooledByteBuf newInstance() {
ThreadLocalPooledByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
return buf;
}
@Override
protected void deallocate() {
if (capacity() > threadLocalDirectBufferSize) {
super.deallocate();
} else {
clear();
RECYCLER.recycle(this, handle);
}
}
}
} }

View File

@ -320,7 +320,7 @@ public class EmbeddedChannel extends AbstractChannel {
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) { for (;;) {
Object msg = in.current(); Object msg = in.current(false);
if (msg == null) { if (msg == null) {
break; break;
} }

View File

@ -283,7 +283,7 @@ public class LocalChannel extends AbstractChannel {
// Use a copy because the original msgs will be recycled by AbstractChannel. // Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()]; final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) { for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current()); msgsCopy[i] = ReferenceCountUtil.retain(in.current(false));
in.remove(); in.remove();
} }

View File

@ -145,7 +145,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
int writeSpinCount = -1; int writeSpinCount = -1;
for (;;) { for (;;) {
Object msg = in.current(); Object msg = in.current(true);
if (msg == null) { if (msg == null) {
// Wrote all messages. // Wrote all messages.
clearOpWrite(); clearOpWrite();
@ -159,16 +159,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
in.remove(); in.remove();
continue; continue;
} }
if (!buf.isDirect()) {
ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
// We can do a better job by using our pooled allocator. If the current allocator does not
// pool a direct buffer, we rely on JDK's direct buffer pool.
buf = alloc.directBuffer(readableBytes).writeBytes(buf);
in.current(buf);
}
}
boolean done = false; boolean done = false;
long flushedAmount = 0; long flushedAmount = 0;
if (writeSpinCount == -1) { if (writeSpinCount == -1) {

View File

@ -155,7 +155,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) { for (;;) {
Object msg = in.current(); Object msg = in.current(false);
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;

View File

@ -235,7 +235,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) { for (;;) {
final Object o = in.current(); final Object o = in.current(false);
if (o == null) { if (o == null) {
break; break;
} }