From 1fc2ad49ec46d259d16fbfd445bc39f49c5410b0 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 19 Aug 2014 07:59:52 +0200 Subject: [PATCH] Allow efficient writing of CompositeByteBuf when using native epoll transport. Motivation: There were no way to efficient write a CompositeByteBuf as we always did a memory copy to a direct buffer in this case. This is not needed as we can just write a CompositeByteBuf as long as all the components are buffers with a memory address. Modifications: - Write CompositeByteBuf which contains only direct buffers without memory copy - Also handle CompositeByteBuf that have more components then 1024. Result: More efficient writing of CompositeByteBuf. --- .../channel/epoll/EpollSocketChannel.java | 20 +++++++-- .../java/io/netty/channel/epoll/IovArray.java | 41 ++++++++++++++++++- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index d0f098e471..12fb680bfe 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; @@ -392,10 +393,21 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) { - // We can only handle buffers with memory address so we need to copy if a non direct is - // passed to write. - buf = newDirectBuffer(buf); - assert buf.hasMemoryAddress(); + if (buf instanceof CompositeByteBuf) { + // Special handling of CompositeByteBuf to reduce memory copies if some of the Components + // in the CompositeByteBuf are backed by a memoryAddress. + CompositeByteBuf comp = (CompositeByteBuf) buf; + if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) { + // more then 1024 buffers for gathering writes so just do a memory copy. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } + } else { + // We can only handle buffers with memory address so we need to copy if a non direct is + // passed to write. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } } return buf; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java index bb4431870e..e685d9760b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java @@ -16,11 +16,14 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer.MessageProcessor; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; +import java.nio.ByteBuffer; + /** * Represent an array of struct array and so can be passed directly over via JNI without the need to do any more * array copies. @@ -95,6 +98,15 @@ final class IovArray implements MessageProcessor { final long addr = buf.memoryAddress(); final int offset = buf.readerIndex(); + add(addr, offset, len); + return true; + } + + private void add(long addr, int offset, int len) { + if (len == 0) { + // No need to add an empty buffer. + return; + } final long baseOffset = memoryAddress(count++); final long lengthOffset = baseOffset + ADDRESS_SIZE; @@ -110,6 +122,26 @@ final class IovArray implements MessageProcessor { } size += len; + } + + private boolean add(CompositeByteBuf buf) { + ByteBuffer[] buffers = buf.nioBuffers(); + if (count + buffers.length >= Native.IOV_MAX) { + // No more room! + return false; + } + for (int i = 0; i < buffers.length; i++) { + ByteBuffer nioBuffer = buffers[i]; + int offset = nioBuffer.position(); + int len = nioBuffer.limit() - nioBuffer.position(); + if (len == 0) { + // No need to add an empty buffer so just continue + continue; + } + long addr = PlatformDependent.directBufferAddress(nioBuffer); + + add(addr, offset, len); + } return true; } @@ -166,7 +198,14 @@ final class IovArray implements MessageProcessor { @Override public boolean processMessage(Object msg) throws Exception { - return msg instanceof ByteBuf && add((ByteBuf) msg); + if (msg instanceof ByteBuf) { + if (msg instanceof CompositeByteBuf) { + return add((CompositeByteBuf) msg); + } else { + return add((ByteBuf) msg); + } + } + return false; } /**