Allow efficient writing of CompositeByteBuf when using native epoll transport.
Motivation: There were no way to efficient write a CompositeByteBuf as we always did a memory copy to a direct buffer in this case. This is not needed as we can just write a CompositeByteBuf as long as all the components are buffers with a memory address. Modifications: - Write CompositeByteBuf which contains only direct buffers without memory copy - Also handle CompositeByteBuf that have more components then 1024. Result: More efficient writing of CompositeByteBuf.
This commit is contained in:
parent
a32652197b
commit
ee198d7cfc
@ -17,6 +17,7 @@ package io.netty.channel.epoll;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelConfig;
|
import io.netty.channel.ChannelConfig;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
@ -392,11 +393,22 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
if (msg instanceof ByteBuf) {
|
if (msg instanceof ByteBuf) {
|
||||||
ByteBuf buf = (ByteBuf) msg;
|
ByteBuf buf = (ByteBuf) msg;
|
||||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||||
|
if (buf instanceof CompositeByteBuf) {
|
||||||
|
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||||
|
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||||
|
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||||
|
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
|
||||||
|
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||||
|
buf = newDirectBuffer(buf);
|
||||||
|
assert buf.hasMemoryAddress();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||||
// passed to write.
|
// passed to write.
|
||||||
buf = newDirectBuffer(buf);
|
buf = newDirectBuffer(buf);
|
||||||
assert buf.hasMemoryAddress();
|
assert buf.hasMemoryAddress();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,11 +16,14 @@
|
|||||||
package io.netty.channel.epoll;
|
package io.netty.channel.epoll;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.channel.ChannelOutboundBuffer;
|
import io.netty.channel.ChannelOutboundBuffer;
|
||||||
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
|
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
|
||||||
import io.netty.util.concurrent.FastThreadLocal;
|
import io.netty.util.concurrent.FastThreadLocal;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
|
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
|
||||||
* array copies.
|
* array copies.
|
||||||
@ -95,6 +98,15 @@ final class IovArray implements MessageProcessor {
|
|||||||
|
|
||||||
final long addr = buf.memoryAddress();
|
final long addr = buf.memoryAddress();
|
||||||
final int offset = buf.readerIndex();
|
final int offset = buf.readerIndex();
|
||||||
|
add(addr, offset, len);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void add(long addr, int offset, int len) {
|
||||||
|
if (len == 0) {
|
||||||
|
// No need to add an empty buffer.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final long baseOffset = memoryAddress(count++);
|
final long baseOffset = memoryAddress(count++);
|
||||||
final long lengthOffset = baseOffset + ADDRESS_SIZE;
|
final long lengthOffset = baseOffset + ADDRESS_SIZE;
|
||||||
@ -110,6 +122,26 @@ final class IovArray implements MessageProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
size += len;
|
size += len;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean add(CompositeByteBuf buf) {
|
||||||
|
ByteBuffer[] buffers = buf.nioBuffers();
|
||||||
|
if (count + buffers.length >= Native.IOV_MAX) {
|
||||||
|
// No more room!
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < buffers.length; i++) {
|
||||||
|
ByteBuffer nioBuffer = buffers[i];
|
||||||
|
int offset = nioBuffer.position();
|
||||||
|
int len = nioBuffer.limit() - nioBuffer.position();
|
||||||
|
if (len == 0) {
|
||||||
|
// No need to add an empty buffer so just continue
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
long addr = PlatformDependent.directBufferAddress(nioBuffer);
|
||||||
|
|
||||||
|
add(addr, offset, len);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,7 +198,14 @@ final class IovArray implements MessageProcessor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean processMessage(Object msg) throws Exception {
|
public boolean processMessage(Object msg) throws Exception {
|
||||||
return msg instanceof ByteBuf && add((ByteBuf) msg);
|
if (msg instanceof ByteBuf) {
|
||||||
|
if (msg instanceof CompositeByteBuf) {
|
||||||
|
return add((CompositeByteBuf) msg);
|
||||||
|
} else {
|
||||||
|
return add((ByteBuf) msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user