diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 1c12ece1fd..dc36b71c71 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -22,12 +22,15 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.UnpooledDirectByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -47,6 +50,13 @@ public final class ChannelOutboundBuffer { private static final int INITIAL_CAPACITY = 32; + private static final int threadLocalDirectBufferSize; + + static { + threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); + logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize); + } + private static final Recycler RECYCLER = new Recycler() { @Override protected ChannelOutboundBuffer newObject(Handle handle) { @@ -236,9 +246,14 @@ public final class ChannelOutboundBuffer { } public Object current() { + return current(true); + } + + public Object current(boolean preferDirect) { if (isEmpty()) { return null; } else { + // TODO: Think of a smart way to handle ByteBufHolder messages Entry entry = buffer[flushed]; if (!entry.cancelled && !entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes @@ -246,7 +261,36 @@ public final class ChannelOutboundBuffer { decrementPendingOutboundBytes(pending); } - return entry.msg; + Object msg = entry.msg; + if (threadLocalDirectBufferSize <= 0 || !preferDirect) { + return msg; + } + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (buf.isDirect()) { + return buf; + } else { + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + return buf; + } + + // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. + // We can do a better job by using our pooled allocator. If the current allocator does not + // pool a direct buffer, we use a ThreadLocal based pool. + ByteBufAllocator alloc = channel.alloc(); + ByteBuf directBuf; + if (alloc.isDirectBufferPooled()) { + directBuf = alloc.directBuffer(readableBytes); + } else { + directBuf = ThreadLocalPooledByteBuf.newInstance(); + } + directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); + current(directBuf); + return directBuf; + } + } + return msg; } } @@ -354,6 +398,7 @@ public final class ChannelOutboundBuffer { } Entry entry = buffer[i]; + if (!entry.cancelled) { if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes @@ -376,7 +421,7 @@ public final class ChannelOutboundBuffer { this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); } - if (buf.isDirect() || !alloc.isDirectBufferPooled()) { + if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { @@ -421,7 +466,12 @@ public final class ChannelOutboundBuffer { private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes, ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) { - ByteBuf directBuf = alloc.directBuffer(readableBytes); + ByteBuf directBuf; + if (alloc.isDirectBufferPooled()) { + directBuf = alloc.directBuffer(readableBytes); + } else { + directBuf = ThreadLocalPooledByteBuf.newInstance(); + } directBuf.writeBytes(buf, readerIndex, readableBytes); buf.release(); entry.msg = directBuf; @@ -638,4 +688,36 @@ public final class ChannelOutboundBuffer { cancelled = false; } } + + static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf { + private final Recycler.Handle handle; + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected ThreadLocalPooledByteBuf newObject(Handle handle) { + return new ThreadLocalPooledByteBuf(handle); + } + }; + + private ThreadLocalPooledByteBuf(Recycler.Handle handle) { + super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); + this.handle = handle; + } + + static ThreadLocalPooledByteBuf newInstance() { + ThreadLocalPooledByteBuf buf = RECYCLER.get(); + buf.setRefCnt(1); + return buf; + } + + @Override + protected void deallocate() { + if (capacity() > threadLocalDirectBufferSize) { + super.deallocate(); + } else { + clear(); + RECYCLER.recycle(this, handle); + } + } + } }