From 25c226a8357d324fb201b1150a3d629f3917d18d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 16 Aug 2013 21:53:47 +0200 Subject: [PATCH] Make sure only direct ByteBuffer are passed to the underlying jdk Channel. This is needed because of otherwise the JDK itself will do an extra ByteBuffer copy with it's own pool implementation. Even worth it will be done multiple times if the ByteBuffer is always only partial written. With this change the copy is done inside of netty using it's own allocator and only be done one time in all cases. --- .../io/netty/buffer/ByteBufAllocator.java | 5 +++ .../netty/buffer/PooledByteBufAllocator.java | 5 +++ .../buffer/UnpooledByteBufAllocator.java | 5 +++ .../channel/sctp/nio/NioSctpChannel.java | 28 ++++++++++--- .../sctp/nio/NioSctpServerChannel.java | 3 +- .../udt/nio/NioUdtAcceptorChannel.java | 3 +- .../nio/NioUdtMessageConnectorChannel.java | 3 +- .../netty/channel/ChannelOutboundBuffer.java | 17 ++++++-- .../channel/nio/AbstractNioByteChannel.java | 14 ++++++- .../nio/AbstractNioMessageChannel.java | 4 +- .../socket/nio/NioDatagramChannel.java | 40 +++++++++++++++---- .../socket/nio/NioServerSocketChannel.java | 3 +- 12 files changed, 106 insertions(+), 24 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java index ac5a6a99e2..9ced16c185 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java @@ -118,4 +118,9 @@ public interface ByteBufAllocator { * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeDirectBuffer(int maxNumComponents); + + /** + * Returns {@code true} if direct {@link ByteBuf}'s are pooled + */ + boolean isDirectBufferPooled(); } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 82ee239579..ab5d14dd8b 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -240,6 +240,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } } + @Override + public boolean isDirectBufferPooled() { + return directArenas != null; + } + public String toString() { StringBuilder buf = new StringBuilder(); buf.append(heapArenas.length); diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java index 081f753451..90fcb10b15 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java @@ -51,4 +51,9 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator { return new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } + + @Override + public boolean isDirectBufferPooled() { + return false; + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index 36e513b898..5dd52731e8 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -20,10 +20,12 @@ import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.SctpChannel; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; @@ -292,7 +294,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); @@ -300,13 +302,19 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett return true; } + ByteBufAllocator alloc = alloc(); + boolean needsCopy = data.nioBufferCount() != 1; + if (!needsCopy) { + if (!data.isDirect() && alloc.isDirectBufferPooled()) { + needsCopy = true; + } + } ByteBuffer nioData; - if (data.nioBufferCount() == 1) { + if (!needsCopy) { nioData = data.nioBuffer(); } else { - nioData = ByteBuffer.allocate(dataLen); - data.getBytes(data.readerIndex(), nioData); - nioData.flip(); + data = alloc.directBuffer(dataLen).writeBytes(data); + nioData = data.nioBuffer(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); @@ -315,7 +323,15 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett final int writtenBytes = javaChannel().send(nioData, mi); - return writtenBytes > 0; + boolean done = writtenBytes > 0; + if (needsCopy) { + if (!done) { + in.current(new SctpMessage(mi, data)); + } else { + in.current(data); + } + } + return done; } @Override diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 188b08d761..d5d578b8bf 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -20,6 +20,7 @@ import com.sun.nio.sctp.SctpServerChannel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.sctp.DefaultSctpServerChannelConfig; @@ -216,7 +217,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index aaff81e134..b913a8a257 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -18,6 +18,7 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.ServerSocketChannelUDT; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.udt.DefaultUdtServerChannelConfig; import io.netty.channel.udt.UdtServerChannel; @@ -93,7 +94,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java index eb135cda75..55cd180e4a 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.udt.DefaultUdtChannelConfig; import io.netty.channel.udt.UdtChannel; @@ -166,7 +167,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { // expects a message final UdtMessage message = (UdtMessage) msg; diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 568d0068c4..2462906e99 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -20,6 +20,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; @@ -230,6 +231,16 @@ public final class ChannelOutboundBuffer { } } + /** + * Replace the current msg with the given one. + * The replaced msg will automatically be released + */ + public void current(Object msg) { + Entry entry = buffer[flushed]; + safeRelease(entry.msg); + entry.msg = msg; + } + public void progress(long amount) { Entry e = buffer[flushed]; ChannelPromise p = e.promise; @@ -309,7 +320,7 @@ public final class ChannelOutboundBuffer { int nioBufferCount = 0; final int mask = buffer.length - 1; - + final ByteBufAllocator alloc = channel.alloc(); Object m; int i = flushed; while (i != unflushed && (m = buffer[i].msg) != null) { @@ -327,7 +338,7 @@ public final class ChannelOutboundBuffer { if (readableBytes > 0) { nioBufferSize += readableBytes; - if (buf.isDirect()) { + if (buf.isDirect() || !alloc.isDirectBufferPooled()) { int count = buf.nioBufferCount(); if (count == 1) { if (nioBufferCount == nioBuffers.length) { @@ -347,7 +358,7 @@ public final class ChannelOutboundBuffer { } } } else { - ByteBuf directBuf = channel.alloc().directBuffer(readableBytes); + ByteBuf directBuf = alloc.directBuffer(readableBytes); directBuf.writeBytes(buf, readerIndex, readableBytes); buf.release(); buffer[i].msg = directBuf; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 6c4ce70705..a668874e12 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -154,11 +154,21 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - if (!buf.isReadable()) { + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { in.remove(); continue; } - + if (!buf.isDirect()) { + ByteBufAllocator alloc = alloc(); + if (alloc.isDirectBufferPooled()) { + // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. + // We can do a better job by using our pooled allocator. If the current allocator does not + // pool a direct buffer, we rely on JDK's direct buffer pool. + buf = alloc.directBuffer(readableBytes).writeBytes(buf); + in.current(buf); + } + } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 70c6d97cea..c674cb5dd0 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -127,7 +127,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - if (doWriteMessage(msg)) { + if (doWriteMessage(msg, in)) { done = true; break; } @@ -155,5 +155,5 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { * * @return {@code true} if and only if the message has been written */ - protected abstract boolean doWriteMessage(Object msg) throws Exception; + protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 24a2b3748e..3cace161a7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -16,13 +16,16 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -223,10 +226,10 @@ public final class NioDatagramChannel } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { final Object m; - final ByteBuf data; final SocketAddress remoteAddress; + ByteBuf data; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope envelope = (AddressedEnvelope) msg; @@ -250,13 +253,19 @@ public final class NioDatagramChannel return true; } + ByteBufAllocator alloc = alloc(); + boolean needsCopy = data.nioBufferCount() != 1; + if (!needsCopy) { + if (!data.isDirect() && alloc.isDirectBufferPooled()) { + needsCopy = true; + } + } ByteBuffer nioData; - if (data.nioBufferCount() == 1) { + if (!needsCopy) { nioData = data.nioBuffer(); } else { - nioData = ByteBuffer.allocate(dataLen); - data.getBytes(data.readerIndex(), nioData); - nioData.flip(); + data = alloc.directBuffer(dataLen).writeBytes(data); + nioData = data.nioBuffer(); } final int writtenBytes; @@ -266,7 +275,24 @@ public final class NioDatagramChannel writtenBytes = javaChannel().write(nioData); } - return writtenBytes > 0; + boolean done = writtenBytes > 0; + if (needsCopy) { + // This means we have allocated a new buffer and need to store it back so we not need to allocate it again + // later + if (remoteAddress == null) { + // remoteAddress is null which means we can handle it as ByteBuf directly + in.current(data); + } else { + if (!done) { + // store it back with all the needed informations + in.current(new DefaultAddressedEnvelope(data, remoteAddress)); + } else { + // Just store back the new create buffer so it is cleaned up once in.remove() is called. + in.current(data); + } + } + } + return done; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 3faf971281..e294f52b5d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig; @@ -151,7 +152,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } }