Allow efficient writing of CompositeByteBuf when using native epoll transport.

Motivation:

There were no way to efficient write a CompositeByteBuf as we always did a memory copy to a direct buffer in this case. This is not needed as we can just write a CompositeByteBuf as long as all the components are buffers with a memory address.

Modifications:

- Write CompositeByteBuf which contains only direct buffers without memory copy
- Also handle CompositeByteBuf that have more components then 1024.

Result:

More efficient writing of CompositeByteBuf.
This commit is contained in:
Norman Maurer 2014-08-19 07:59:52 +02:00
parent ed50eb8781
commit 1fc2ad49ec
2 changed files with 56 additions and 5 deletions

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
@ -392,10 +393,21 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
if (buf instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) buf;
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
// more then 1024 buffers for gathering writes so just do a memory copy.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
} else {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
}
return buf;
}

View File

@ -16,11 +16,14 @@
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
/**
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
* array copies.
@ -95,6 +98,15 @@ final class IovArray implements MessageProcessor {
final long addr = buf.memoryAddress();
final int offset = buf.readerIndex();
add(addr, offset, len);
return true;
}
private void add(long addr, int offset, int len) {
if (len == 0) {
// No need to add an empty buffer.
return;
}
final long baseOffset = memoryAddress(count++);
final long lengthOffset = baseOffset + ADDRESS_SIZE;
@ -110,6 +122,26 @@ final class IovArray implements MessageProcessor {
}
size += len;
}
private boolean add(CompositeByteBuf buf) {
ByteBuffer[] buffers = buf.nioBuffers();
if (count + buffers.length >= Native.IOV_MAX) {
// No more room!
return false;
}
for (int i = 0; i < buffers.length; i++) {
ByteBuffer nioBuffer = buffers[i];
int offset = nioBuffer.position();
int len = nioBuffer.limit() - nioBuffer.position();
if (len == 0) {
// No need to add an empty buffer so just continue
continue;
}
long addr = PlatformDependent.directBufferAddress(nioBuffer);
add(addr, offset, len);
}
return true;
}
@ -166,7 +198,14 @@ final class IovArray implements MessageProcessor {
@Override
public boolean processMessage(Object msg) throws Exception {
return msg instanceof ByteBuf && add((ByteBuf) msg);
if (msg instanceof ByteBuf) {
if (msg instanceof CompositeByteBuf) {
return add((CompositeByteBuf) msg);
} else {
return add((ByteBuf) msg);
}
}
return false;
}
/**