diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java index ac5a6a99e2..9ced16c185 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java @@ -118,4 +118,9 @@ public interface ByteBufAllocator { * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it. */ CompositeByteBuf compositeDirectBuffer(int maxNumComponents); + + /** + * Returns {@code true} if direct {@link ByteBuf}'s are pooled + */ + boolean isDirectBufferPooled(); } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 82ee239579..ab5d14dd8b 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -240,6 +240,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } } + @Override + public boolean isDirectBufferPooled() { + return directArenas != null; + } + public String toString() { StringBuilder buf = new StringBuilder(); buf.append(heapArenas.length); diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java index 081f753451..90fcb10b15 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java @@ -51,4 +51,9 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator { return new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } + + @Override + public boolean isDirectBufferPooled() { + return false; + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index 36e513b898..5dd52731e8 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -20,10 +20,12 @@ import com.sun.nio.sctp.MessageInfo; import com.sun.nio.sctp.NotificationHandler; import com.sun.nio.sctp.SctpChannel; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; @@ -292,7 +294,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); @@ -300,13 +302,19 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett return true; } + ByteBufAllocator alloc = alloc(); + boolean needsCopy = data.nioBufferCount() != 1; + if (!needsCopy) { + if (!data.isDirect() && alloc.isDirectBufferPooled()) { + needsCopy = true; + } + } ByteBuffer nioData; - if (data.nioBufferCount() == 1) { + if (!needsCopy) { nioData = data.nioBuffer(); } else { - nioData = ByteBuffer.allocate(dataLen); - data.getBytes(data.readerIndex(), nioData); - nioData.flip(); + data = alloc.directBuffer(dataLen).writeBytes(data); + nioData = data.nioBuffer(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); @@ -315,7 +323,15 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett final int writtenBytes = javaChannel().send(nioData, mi); - return writtenBytes > 0; + boolean done = writtenBytes > 0; + if (needsCopy) { + if (!done) { + in.current(new SctpMessage(mi, data)); + } else { + in.current(data); + } + } + return done; } @Override diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 188b08d761..d5d578b8bf 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -20,6 +20,7 @@ import com.sun.nio.sctp.SctpServerChannel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.sctp.DefaultSctpServerChannelConfig; @@ -216,7 +217,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java index aaff81e134..b913a8a257 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtAcceptorChannel.java @@ -18,6 +18,7 @@ package io.netty.channel.udt.nio; import com.barchart.udt.TypeUDT; import com.barchart.udt.nio.ServerSocketChannelUDT; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.udt.DefaultUdtServerChannelConfig; import io.netty.channel.udt.UdtServerChannel; @@ -93,7 +94,7 @@ public abstract class NioUdtAcceptorChannel extends AbstractNioMessageChannel im } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } diff --git a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java index eb135cda75..55cd180e4a 100644 --- a/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java +++ b/transport-udt/src/main/java/io/netty/channel/udt/nio/NioUdtMessageConnectorChannel.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.udt.DefaultUdtChannelConfig; import io.netty.channel.udt.UdtChannel; @@ -166,7 +167,7 @@ public class NioUdtMessageConnectorChannel extends AbstractNioMessageChannel imp } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { // expects a message final UdtMessage message = (UdtMessage) msg; diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 568d0068c4..2462906e99 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -20,6 +20,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; @@ -230,6 +231,16 @@ public final class ChannelOutboundBuffer { } } + /** + * Replace the current msg with the given one. + * The replaced msg will automatically be released + */ + public void current(Object msg) { + Entry entry = buffer[flushed]; + safeRelease(entry.msg); + entry.msg = msg; + } + public void progress(long amount) { Entry e = buffer[flushed]; ChannelPromise p = e.promise; @@ -309,7 +320,7 @@ public final class ChannelOutboundBuffer { int nioBufferCount = 0; final int mask = buffer.length - 1; - + final ByteBufAllocator alloc = channel.alloc(); Object m; int i = flushed; while (i != unflushed && (m = buffer[i].msg) != null) { @@ -327,7 +338,7 @@ public final class ChannelOutboundBuffer { if (readableBytes > 0) { nioBufferSize += readableBytes; - if (buf.isDirect()) { + if (buf.isDirect() || !alloc.isDirectBufferPooled()) { int count = buf.nioBufferCount(); if (count == 1) { if (nioBufferCount == nioBuffers.length) { @@ -347,7 +358,7 @@ public final class ChannelOutboundBuffer { } } } else { - ByteBuf directBuf = channel.alloc().directBuffer(readableBytes); + ByteBuf directBuf = alloc.directBuffer(readableBytes); directBuf.writeBytes(buf, readerIndex, readableBytes); buf.release(); buffer[i].msg = directBuf; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 6c4ce70705..a668874e12 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -154,11 +154,21 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - if (!buf.isReadable()) { + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { in.remove(); continue; } - + if (!buf.isDirect()) { + ByteBufAllocator alloc = alloc(); + if (alloc.isDirectBufferPooled()) { + // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. + // We can do a better job by using our pooled allocator. If the current allocator does not + // pool a direct buffer, we rely on JDK's direct buffer pool. + buf = alloc.directBuffer(readableBytes).writeBytes(buf); + in.current(buf); + } + } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 70c6d97cea..c674cb5dd0 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -127,7 +127,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - if (doWriteMessage(msg)) { + if (doWriteMessage(msg, in)) { done = true; break; } @@ -155,5 +155,5 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { * * @return {@code true} if and only if the message has been written */ - protected abstract boolean doWriteMessage(Object msg) throws Exception; + protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 24a2b3748e..3cace161a7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -16,13 +16,16 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -223,10 +226,10 @@ public final class NioDatagramChannel } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { final Object m; - final ByteBuf data; final SocketAddress remoteAddress; + ByteBuf data; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope envelope = (AddressedEnvelope) msg; @@ -250,13 +253,19 @@ public final class NioDatagramChannel return true; } + ByteBufAllocator alloc = alloc(); + boolean needsCopy = data.nioBufferCount() != 1; + if (!needsCopy) { + if (!data.isDirect() && alloc.isDirectBufferPooled()) { + needsCopy = true; + } + } ByteBuffer nioData; - if (data.nioBufferCount() == 1) { + if (!needsCopy) { nioData = data.nioBuffer(); } else { - nioData = ByteBuffer.allocate(dataLen); - data.getBytes(data.readerIndex(), nioData); - nioData.flip(); + data = alloc.directBuffer(dataLen).writeBytes(data); + nioData = data.nioBuffer(); } final int writtenBytes; @@ -266,7 +275,24 @@ public final class NioDatagramChannel writtenBytes = javaChannel().write(nioData); } - return writtenBytes > 0; + boolean done = writtenBytes > 0; + if (needsCopy) { + // This means we have allocated a new buffer and need to store it back so we not need to allocate it again + // later + if (remoteAddress == null) { + // remoteAddress is null which means we can handle it as ByteBuf directly + in.current(data); + } else { + if (!done) { + // store it back with all the needed informations + in.current(new DefaultAddressedEnvelope(data, remoteAddress)); + } else { + // Just store back the new create buffer so it is cleaned up once in.remove() is called. + in.current(data); + } + } + } + return done; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 3faf971281..e294f52b5d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig; @@ -151,7 +152,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel } @Override - protected boolean doWriteMessage(Object msg) throws Exception { + protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } }