Use thread local direct buffer for I/O when the current allocator is unpooled
- Allocating and deallocating a direct buffer for I/O is an expensive operation, so we have to at least have a pool of direct buffers if the current allocator is not pooled
This commit is contained in:
parent
ab1a7b9774
commit
11a235ffe9
@ -22,12 +22,15 @@ package io.netty.channel;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufHolder;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledDirectByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.Recycler;
|
||||
import io.netty.util.Recycler.Handle;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -47,6 +50,13 @@ public final class ChannelOutboundBuffer {
|
||||
|
||||
private static final int INITIAL_CAPACITY = 32;
|
||||
|
||||
private static final int threadLocalDirectBufferSize;
|
||||
|
||||
static {
|
||||
threadLocalDirectBufferSize = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
|
||||
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
|
||||
}
|
||||
|
||||
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
|
||||
@Override
|
||||
protected ChannelOutboundBuffer newObject(Handle handle) {
|
||||
@ -236,9 +246,14 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
|
||||
public Object current() {
|
||||
return current(true);
|
||||
}
|
||||
|
||||
public Object current(boolean preferDirect) {
|
||||
if (isEmpty()) {
|
||||
return null;
|
||||
} else {
|
||||
// TODO: Think of a smart way to handle ByteBufHolder messages
|
||||
Entry entry = buffer[flushed];
|
||||
if (!entry.cancelled && !entry.promise.setUncancellable()) {
|
||||
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
||||
@ -246,7 +261,36 @@ public final class ChannelOutboundBuffer {
|
||||
decrementPendingOutboundBytes(pending);
|
||||
}
|
||||
|
||||
return entry.msg;
|
||||
Object msg = entry.msg;
|
||||
if (threadLocalDirectBufferSize <= 0 || !preferDirect) {
|
||||
return msg;
|
||||
}
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (buf.isDirect()) {
|
||||
return buf;
|
||||
} else {
|
||||
int readableBytes = buf.readableBytes();
|
||||
if (readableBytes == 0) {
|
||||
return buf;
|
||||
}
|
||||
|
||||
// Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
|
||||
// We can do a better job by using our pooled allocator. If the current allocator does not
|
||||
// pool a direct buffer, we use a ThreadLocal based pool.
|
||||
ByteBufAllocator alloc = channel.alloc();
|
||||
ByteBuf directBuf;
|
||||
if (alloc.isDirectBufferPooled()) {
|
||||
directBuf = alloc.directBuffer(readableBytes);
|
||||
} else {
|
||||
directBuf = ThreadLocalPooledByteBuf.newInstance();
|
||||
}
|
||||
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
|
||||
current(directBuf);
|
||||
return directBuf;
|
||||
}
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
|
||||
@ -354,6 +398,7 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
|
||||
Entry entry = buffer[i];
|
||||
|
||||
if (!entry.cancelled) {
|
||||
if (!entry.promise.setUncancellable()) {
|
||||
// Was cancelled so make sure we free up memory and notify about the freed bytes
|
||||
@ -376,7 +421,7 @@ public final class ChannelOutboundBuffer {
|
||||
this.nioBuffers = nioBuffers =
|
||||
expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
|
||||
}
|
||||
if (buf.isDirect() || !alloc.isDirectBufferPooled()) {
|
||||
if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
|
||||
if (count == 1) {
|
||||
ByteBuffer nioBuf = entry.buf;
|
||||
if (nioBuf == null) {
|
||||
@ -421,7 +466,12 @@ public final class ChannelOutboundBuffer {
|
||||
|
||||
private static int fillBufferArrayNonDirect(Entry entry, ByteBuf buf, int readerIndex, int readableBytes,
|
||||
ByteBufAllocator alloc, ByteBuffer[] nioBuffers, int nioBufferCount) {
|
||||
ByteBuf directBuf = alloc.directBuffer(readableBytes);
|
||||
ByteBuf directBuf;
|
||||
if (alloc.isDirectBufferPooled()) {
|
||||
directBuf = alloc.directBuffer(readableBytes);
|
||||
} else {
|
||||
directBuf = ThreadLocalPooledByteBuf.newInstance();
|
||||
}
|
||||
directBuf.writeBytes(buf, readerIndex, readableBytes);
|
||||
buf.release();
|
||||
entry.msg = directBuf;
|
||||
@ -639,4 +689,35 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf {
|
||||
private final Recycler.Handle<ThreadLocalPooledByteBuf> handle;
|
||||
|
||||
private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER = new Recycler<ThreadLocalPooledByteBuf>() {
|
||||
@Override
|
||||
protected ThreadLocalPooledByteBuf newObject(Handle<ThreadLocalPooledByteBuf> handle) {
|
||||
return new ThreadLocalPooledByteBuf(handle);
|
||||
}
|
||||
};
|
||||
|
||||
private ThreadLocalPooledByteBuf(Recycler.Handle<ThreadLocalPooledByteBuf> handle) {
|
||||
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
|
||||
this.handle = handle;
|
||||
}
|
||||
|
||||
static ThreadLocalPooledByteBuf newInstance() {
|
||||
ThreadLocalPooledByteBuf buf = RECYCLER.get();
|
||||
buf.setRefCnt(1);
|
||||
return buf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deallocate() {
|
||||
if (capacity() > threadLocalDirectBufferSize) {
|
||||
super.deallocate();
|
||||
} else {
|
||||
clear();
|
||||
RECYCLER.recycle(this, handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user